Added support for the @Transactional annotation within the @Poller annotation (INT-399). Simplified AbstractMessageBarrierConsumer somewhat (to reuse base class methods for sending replies and discarding incomplete messages). Added TestUtils with convenient property-path accessor methods. Refactored annotation post-processors such that the BeanFactory is passed to the individual method post-processors rather than passing the MessageBus. The base class still detects the MessageBus bean and provides a protected 'channelRegistry' reference.

This commit is contained in:
Mark Fisher
2008-10-08 00:19:20 +00:00
parent ea220cac6c
commit 6c4245f3ca
21 changed files with 331 additions and 177 deletions

View File

@@ -28,6 +28,7 @@ import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.ThreadLocalChannel;
import org.springframework.integration.config.MessageBusParser;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer;
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
@@ -76,7 +77,8 @@ public class DirectChannelSubscriptionTests {
public void testSendAndReceiveForAnnotatedEndpoint() {
GenericApplicationContext context = new GenericApplicationContext();
bus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(bus);
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
TestEndpoint endpoint = new TestEndpoint();
@@ -109,10 +111,11 @@ public class DirectChannelSubscriptionTests {
public void testExceptionThrownFromAnnotatedEndpoint() {
GenericApplicationContext context = new GenericApplicationContext();
bus.setApplicationContext(context);
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus);
QueueChannel errorChannel = new QueueChannel();
errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME);
context.getBeanFactory().registerSingleton(ChannelRegistry.ERROR_CHANNEL_NAME, errorChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(bus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
FailingTestEndpoint endpoint = new FailingTestEndpoint();

View File

@@ -37,6 +37,7 @@ import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.util.MethodInvoker;
import org.springframework.integration.util.TestUtils;
/**
* @author Marius Bogoevici
@@ -91,7 +92,7 @@ public class AggregatorParserTests {
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate discard channel",
discardChannel, accessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The AggregatorEndpoint is not set with the appropriate timeout value",
86420000l, accessor.getPropertyValue("sendTimeout"));
86420000l, TestUtils.getPropertyValue(consumer, "channelTemplate.sendTimeout"));
Assert.assertEquals(
"The AggregatorEndpoint is not configured with the appropriate 'send partial results on timeout' flag",
true, accessor.getPropertyValue("sendPartialResultOnTimeout"));

View File

@@ -16,16 +16,21 @@
package org.springframework.integration.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.springframework.integration.util.TestUtils.getPropertyValue;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.aggregator.Resequencer;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
@@ -60,34 +65,32 @@ public class ResequencerParserTests {
Message<?> message1 = outputChannel.receive(500);
Message<?> message2 = outputChannel.receive(500);
Message<?> message3 = outputChannel.receive(500);
Assert.assertNotNull(message1);
Assert.assertEquals(new Integer(1), message1.getHeaders().getSequenceNumber());
Assert.assertNotNull(message2);
Assert.assertEquals(new Integer(2), message2.getHeaders().getSequenceNumber());
Assert.assertNotNull(message3);
Assert.assertEquals(new Integer(3), message3.getHeaders().getSequenceNumber());
assertNotNull(message1);
assertEquals(new Integer(1), message1.getHeaders().getSequenceNumber());
assertNotNull(message2);
assertEquals(new Integer(2), message2.getHeaders().getSequenceNumber());
assertNotNull(message3);
assertEquals(new Integer(3), message3.getHeaders().getSequenceNumber());
}
@Test
public void testDefaultResequencerProperties() {
SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("defaultResequencer");
DirectFieldAccessor accessor = new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer"));
Assert.assertNull(accessor.getPropertyValue("outputChannel"));
Assert.assertNull(accessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value",
1000l, accessor.getPropertyValue("sendTimeout"));
Assert.assertEquals(
"The ResequencerEndpoint is not configured with the appropriate 'send partial results on timeout' flag",
false, accessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate reaper interval",
1000l, accessor.getPropertyValue("reaperInterval"));
Assert.assertEquals(
"The ResequencerEndpoint is not configured with the appropriate tracked correlationId capacity",
1000, accessor.getPropertyValue("trackedCorrelationIdCapacity"));
Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate timeout",
60000l, accessor.getPropertyValue("timeout"));
Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate 'release partial sequences' flag",
true, accessor.getPropertyValue("releasePartialSequences"));
Resequencer resequencer = (Resequencer) new DirectFieldAccessor(endpoint).getPropertyValue("consumer");
assertNull(getPropertyValue(resequencer, "outputChannel"));
assertNull(getPropertyValue(resequencer, "discardChannel"));
assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value",
1000l, getPropertyValue(resequencer, "channelTemplate.sendTimeout"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate 'send partial results on timeout' flag",
false, getPropertyValue(resequencer, "sendPartialResultOnTimeout"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate reaper interval",
1000l, getPropertyValue(resequencer, "reaperInterval"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate tracked correlationId capacity",
1000, getPropertyValue(resequencer, "trackedCorrelationIdCapacity"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate timeout",
60000l, getPropertyValue(resequencer, "timeout"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate 'release partial sequences' flag",
true, getPropertyValue(resequencer, "releasePartialSequences"));
}
@Test
@@ -95,27 +98,26 @@ public class ResequencerParserTests {
SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("completelyDefinedResequencer");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
DirectFieldAccessor accessor = new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer"));
Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate output channel",
outputChannel, accessor.getPropertyValue("outputChannel"));
Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate discard channel",
discardChannel, accessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value",
86420000l, accessor.getPropertyValue("sendTimeout"));
Assert.assertEquals(
"The ResequencerEndpoint is not configured with the appropriate 'send partial results on timeout' flag",
true, accessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate reaper interval",
135l, accessor.getPropertyValue("reaperInterval"));
Assert.assertEquals(
"The ResequencerEndpoint is not configured with the appropriate tracked correlationId capacity",
99, accessor.getPropertyValue("trackedCorrelationIdCapacity"));
Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate timeout",
42l, accessor.getPropertyValue("timeout"));
Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate 'release partial sequences' flag",
false, accessor.getPropertyValue("releasePartialSequences"));
Resequencer resequencer = (Resequencer) new DirectFieldAccessor(endpoint).getPropertyValue("consumer");
assertEquals("The ResequencerEndpoint is not injected with the appropriate output channel",
outputChannel, getPropertyValue(resequencer, "outputChannel"));
assertEquals("The ResequencerEndpoint is not injected with the appropriate discard channel",
discardChannel, getPropertyValue(resequencer, "discardChannel"));
assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value",
86420000l, getPropertyValue(resequencer, "channelTemplate.sendTimeout"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate 'send partial results on timeout' flag",
true, getPropertyValue(resequencer, "sendPartialResultOnTimeout"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate reaper interval",
135l, getPropertyValue(resequencer, "reaperInterval"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate tracked correlationId capacity",
99, getPropertyValue(resequencer, "trackedCorrelationIdCapacity"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate timeout",
42l, getPropertyValue(resequencer, "timeout"));
assertEquals("The ResequencerEndpoint is not configured with the appropriate 'release partial sequences' flag",
false, getPropertyValue(resequencer, "releasePartialSequences"));
}
private static <T> Message<T> createMessage(T payload, Object correlationId,
int sequenceSize, int sequenceNumber, MessageChannel outputChannel) {
return MessageBuilder.withPayload(payload)

View File

@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
@@ -51,7 +50,6 @@ public class ServiceActivatorAnnotationPostProcessorTests {
String busBeanName = MessageBusParser.MESSAGE_BUS_BEAN_NAME;
context.registerBeanDefinition(busBeanName, new RootBeanDefinition(DefaultMessageBus.class));
RootBeanDefinition postProcessorDef = new RootBeanDefinition(MessagingAnnotationPostProcessor.class);
postProcessorDef.getConstructorArgumentValues().addGenericArgumentValue(new RuntimeBeanReference(busBeanName));
context.registerBeanDefinition("postProcessor", postProcessorDef);
context.refresh();
context.start();

View File

@@ -16,6 +16,13 @@
package org.springframework.integration.config.annotation;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.springframework.integration.util.TestUtils.getPropertyValue;
import java.lang.reflect.Method;
import org.junit.Assert;
@@ -42,17 +49,18 @@ public class AggregatorAnnotationTests {
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml" });
final String endpointName = "endpointWithDefaultAnnotation";
DirectFieldAccessor accessor = getDirectFieldAccessorForAggregatingHandler(context,
endpointName);
Assert.assertTrue(accessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertNull(accessor.getPropertyValue("outputChannel"));
Assert.assertNull(accessor.getPropertyValue("discardChannel"));
Assert.assertEquals(AbstractMessageAggregator.DEFAULT_SEND_TIMEOUT, accessor.getPropertyValue("sendTimeout"));
Assert.assertEquals(AbstractMessageAggregator.DEFAULT_TIMEOUT, accessor.getPropertyValue("timeout"));
Assert.assertEquals(false, accessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals(AbstractMessageAggregator.DEFAULT_REAPER_INTERVAL, accessor.getPropertyValue("reaperInterval"));
Assert.assertEquals(AbstractMessageAggregator.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY,
accessor.getPropertyValue("trackedCorrelationIdCapacity"));
AbstractMessageAggregator aggregator = this.getAggregator(context, endpointName);
assertTrue(getPropertyValue(aggregator, "completionStrategy") instanceof SequenceSizeCompletionStrategy);
assertNull(getPropertyValue(aggregator, "outputChannel"));
assertNull(getPropertyValue(aggregator, "discardChannel"));
assertEquals(AbstractMessageAggregator.DEFAULT_SEND_TIMEOUT,
getPropertyValue(aggregator, "channelTemplate.sendTimeout"));
assertEquals(AbstractMessageAggregator.DEFAULT_TIMEOUT, getPropertyValue(aggregator, "timeout"));
assertEquals(false, getPropertyValue(aggregator, "sendPartialResultOnTimeout"));
assertEquals(AbstractMessageAggregator.DEFAULT_REAPER_INTERVAL,
getPropertyValue(aggregator, "reaperInterval"));
assertEquals(AbstractMessageAggregator.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY,
getPropertyValue(aggregator, "trackedCorrelationIdCapacity"));
}
@Test
@@ -60,15 +68,18 @@ public class AggregatorAnnotationTests {
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml" });
final String endpointName = "endpointWithCustomizedAnnotation";
DirectFieldAccessor accessor = getDirectFieldAccessorForAggregatingHandler(context, endpointName);
Assert.assertTrue(accessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertEquals(getMessageBus(context).lookupChannel("outputChannel"), accessor.getPropertyValue("outputChannel"));
Assert.assertEquals(getMessageBus(context).lookupChannel("discardChannel"), accessor.getPropertyValue("discardChannel"));
Assert.assertEquals(98765432l, accessor.getPropertyValue("sendTimeout"));
Assert.assertEquals(4567890l, accessor.getPropertyValue("timeout"));
Assert.assertEquals(true, accessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals(1234l, accessor.getPropertyValue("reaperInterval"));
Assert.assertEquals(42, accessor.getPropertyValue("trackedCorrelationIdCapacity"));
AbstractMessageAggregator aggregator = this.getAggregator(context, endpointName);
assertTrue(getPropertyValue(aggregator, "completionStrategy")
instanceof SequenceSizeCompletionStrategy);
assertEquals(getMessageBus(context).lookupChannel("outputChannel"),
getPropertyValue(aggregator, "outputChannel"));
assertEquals(getMessageBus(context).lookupChannel("discardChannel"),
getPropertyValue(aggregator, "discardChannel"));
assertEquals(98765432l, getPropertyValue(aggregator, "channelTemplate.sendTimeout"));
assertEquals(4567890l, getPropertyValue(aggregator, "timeout"));
assertEquals(true, getPropertyValue(aggregator, "sendPartialResultOnTimeout"));
assertEquals(1234l, getPropertyValue(aggregator, "reaperInterval"));
assertEquals(42, getPropertyValue(aggregator, "trackedCorrelationIdCapacity"));
}
@Test
@@ -76,24 +87,24 @@ public class AggregatorAnnotationTests {
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml" });
final String endpointName = "endpointWithDefaultAnnotationAndCustomCompletionStrategy";
DirectFieldAccessor aggregatingMessageHandlerAccessor = getDirectFieldAccessorForAggregatingHandler(context, endpointName);
Object completionStrategy = aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy");
AbstractMessageAggregator aggregator = this.getAggregator(context, endpointName);
Object completionStrategy = getPropertyValue(aggregator, "completionStrategy");
Assert.assertTrue(completionStrategy instanceof CompletionStrategyAdapter);
CompletionStrategyAdapter completionStrategyAdapter = (CompletionStrategyAdapter) completionStrategy;
DirectFieldAccessor invokerAccessor = new DirectFieldAccessor(
new DirectFieldAccessor(completionStrategyAdapter).getPropertyValue("invoker"));
Object targetObject = invokerAccessor.getPropertyValue("object");
Assert.assertSame(context.getBean(endpointName), targetObject);
assertSame(context.getBean(endpointName), targetObject);
Method completionCheckerMethod = (Method) invokerAccessor.getPropertyValue("method");
Assert.assertEquals("completionChecker", completionCheckerMethod.getName());
assertEquals("completionChecker", completionCheckerMethod.getName());
}
@SuppressWarnings("unchecked")
private DirectFieldAccessor getDirectFieldAccessorForAggregatingHandler(ApplicationContext context, final String endpointName) {
private AbstractMessageAggregator getAggregator(ApplicationContext context, final String endpointName) {
SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean(
endpointName + ".aggregatingMethod.aggregator");
return new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer"));
return (AbstractMessageAggregator) new DirectFieldAccessor(endpoint).getPropertyValue("consumer");
}
private MessageBus getMessageBus(ApplicationContext context) {

View File

@@ -29,6 +29,7 @@ import org.junit.Test;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
@@ -61,11 +62,13 @@ public class MessagingAnnotationPostProcessorTests {
public void testServiceActivatorAnnotation() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setApplicationContext(context);
QueueChannel inputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
messageBus.registerChannel(inputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ServiceActivatorAnnotatedBean bean = new ServiceActivatorAnnotatedBean();
@@ -139,8 +142,10 @@ public class MessagingAnnotationPostProcessorTests {
public void testTargetAnnotation() throws InterruptedException {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
CountDownLatch latch = new CountDownLatch(1);
@@ -155,20 +160,33 @@ public class MessagingAnnotationPostProcessorTests {
messageBus.stop();
}
@Test(expected=IllegalArgumentException.class)
public void testPostProcessorWithNullMessageBus() {
new MessagingAnnotationPostProcessor(null);
@Test(expected = IllegalArgumentException.class)
public void testPostProcessorWithoutBeanFactory() {
MessagingAnnotationPostProcessor postProcessor =
new MessagingAnnotationPostProcessor();
postProcessor.afterPropertiesSet();
}
@Test(expected = NoSuchBeanDefinitionException.class)
public void testPostProcessorWithoutMessageBus() {
GenericApplicationContext context = new GenericApplicationContext();
MessagingAnnotationPostProcessor postProcessor =
new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
}
@Test
public void testChannelRegistryAwareBean() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setApplicationContext(context);
QueueChannel inputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
messageBus.registerChannel(inputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ChannelRegistryAwareTestBean testBean = new ChannelRegistryAwareTestBean();
@@ -182,15 +200,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testProxiedMessageEndpointAnnotation() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpoint());
@@ -206,15 +226,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInherited() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass");
@@ -228,15 +250,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedWithProxy() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointSubclass());
@@ -252,15 +276,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedFromInterface() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
@@ -274,15 +300,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithAutoCreatedChannels() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
@@ -296,15 +324,17 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
inputChannel.setBeanName("inputChannel");
outputChannel.setBeanName("outputChannel");
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointImplementation());
@@ -320,12 +350,14 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testEndpointWithPollerAnnotation() {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
QueueChannel testChannel = new QueueChannel();
testChannel.setBeanName("testChannel");
context.getBeanFactory().registerSingleton("testChannel", testChannel);
DefaultMessageBus messageBus = new DefaultMessageBus();
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
AnnotatedEndpointWithPolledAnnotation bean = new AnnotatedEndpointWithPolledAnnotation();
@@ -344,8 +376,10 @@ public class MessagingAnnotationPostProcessorTests {
public void testChannelAdapterAnnotation() throws InterruptedException {
GenericApplicationContext context = new GenericApplicationContext();
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ChannelAdapterAnnotationTestBean testBean = new ChannelAdapterAnnotationTestBean();
@@ -379,7 +413,7 @@ public class MessagingAnnotationPostProcessorTests {
DefaultMessageBus messageBus = new DefaultMessageBus();
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setApplicationContext(context);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
TransformerAnnotationTestBean testBean = new TransformerAnnotationTestBean();

View File

@@ -27,6 +27,7 @@ import org.springframework.integration.annotation.Router;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.MessageBusParser;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
@@ -51,12 +52,14 @@ public class RouterAnnotationPostProcessorTests {
outputChannel.setBeanName("output");
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output", outputChannel);
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
}
@Test
public void testRouter() {
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
TestRouter testRouter = new TestRouter();

View File

@@ -29,6 +29,7 @@ import org.springframework.integration.annotation.Splitter;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.MessageBusParser;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
@@ -52,13 +53,15 @@ public class SplitterAnnotationPostProcessorTests {
outputChannel.setBeanName("output");
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output", outputChannel);
context.getBeanFactory().registerSingleton(
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
messageBus.setApplicationContext(context);
}
@Test
public void testSplitterAnnotation() throws InterruptedException {
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
TestSplitter splitter = new TestSplitter();

View File

@@ -7,7 +7,9 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<integration:message-bus/>
<integration:annotation-driven/>
<integration:channel id="inputChannel"/>
@@ -17,8 +19,4 @@
<bean id="endpoint" class="org.springframework.integration.config.annotation.TypeConvertingTestEndpoint"/>
<bean class="org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor">
<constructor-arg ref="bus"/>
</bean>
</beans>

View File

@@ -0,0 +1,54 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.util;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
*/
public abstract class TestUtils {
public static Object getPropertyValue(Object root, String propertyPath) {
Object value = null;
DirectFieldAccessor accessor = new DirectFieldAccessor(root);
String[] tokens = propertyPath.split("\\.");
for (int i = 0; i < tokens.length; i++) {
value = accessor.getPropertyValue(tokens[i]);
if (value != null) {
accessor = new DirectFieldAccessor(value);
}
else if (i == tokens.length - 1) {
return null;
}
else {
throw new IllegalArgumentException(
"intermediate property '" + tokens[i] + "' is null");
}
}
return value;
}
@SuppressWarnings("unchecked")
public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type) {
Object value = getPropertyValue(root, propertyPath);
Assert.isAssignable(type, value.getClass());
return (T) value;
}
}