From 07b2179baa93417097c8a00ed3cb40e94da67ff0 Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Mon, 30 Jun 2008 22:26:04 +0000 Subject: [PATCH] Added support for interceptors on MessageEndpoints. --- .../adapter/stream/ByteStreamSourceTests.java | 91 +--- .../adapter/stream/ByteStreamTargetTests.java | 29 +- .../stream/CharacterStreamSourceTests.java | 63 +-- .../stream/CharacterStreamTargetTests.java | 29 +- .../integration/bus/MessageBus.java | 25 +- .../integration/bus/SubscriptionManager.java | 8 +- .../config/AbstractTargetEndpointParser.java | 7 + .../config/IntegrationNamespaceUtils.java | 20 + .../integration/config/MessageBusParser.java | 30 +- .../MessageEndpointBeanPostProcessor.java | 75 ++++ .../config/SourceEndpointParser.java | 8 +- .../SourceAnnotationPostProcessor.java | 4 +- .../config/spring-integration-core-1.0.xsd | 17 + .../dispatcher/DefaultPollingDispatcher.java | 97 ----- .../dispatcher/PollingDispatcherTask.java | 37 +- .../endpoint/AbstractEndpoint.java | 47 ++- .../endpoint/EndpointInterceptor.java | 34 ++ .../endpoint/EndpointInterceptorAdapter.java | 39 ++ .../endpoint/EndpointMethodInterceptor.java | 56 +++ .../integration/endpoint/MessageEndpoint.java | 6 +- .../integration/endpoint/SourceEndpoint.java | 133 ++---- .../integration/endpoint/TargetEndpoint.java | 12 +- .../message/{Poller.java => Command.java} | 8 +- .../CommandMessage.java} | 11 +- .../integration/message/PollCommand.java | 24 ++ .../integration/bus/MessageBusTests.java | 3 +- ...MessageEndpointBeanPostProcessorTests.java | 113 +++++ .../integration/config/TestBeforeAdvice.java | 40 ++ .../config/TestEndpointInterceptor.java | 44 ++ .../integration/config/TestSource.java | 32 ++ .../integration/config/TestTarget.java | 31 ++ .../messageEndpointBeanPostProcessorTests.xml | 76 ++++ .../DefaultPollingDispatcherTests.java | 101 ----- .../endpoint/SourceEndpointTests.java | 390 +----------------- .../integration/handler/adapterTests.xml | 4 +- 35 files changed, 836 insertions(+), 908 deletions(-) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java delete mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DefaultPollingDispatcher.java create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptor.java create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptorAdapter.java create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointMethodInterceptor.java rename org.springframework.integration/src/main/java/org/springframework/integration/message/{Poller.java => Command.java} (90%) rename org.springframework.integration/src/main/java/org/springframework/integration/{dispatcher/PollingDispatcher.java => message/CommandMessage.java} (72%) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/message/PollCommand.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/config/MessageEndpointBeanPostProcessorTests.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/config/TestBeforeAdvice.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/config/TestSource.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/config/TestTarget.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/config/messageEndpointBeanPostProcessorTests.xml delete mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DefaultPollingDispatcherTests.java diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java index 2c4be97ee2..34f948ddc8 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java @@ -26,7 +26,9 @@ import org.junit.Test; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.SourceEndpoint; +import org.springframework.integration.message.CommandMessage; import org.springframework.integration.message.Message; +import org.springframework.integration.message.PollCommand; import org.springframework.integration.scheduling.PollingSchedule; /** @@ -40,10 +42,8 @@ public class ByteStreamSourceTests { ByteArrayInputStream stream = new ByteArrayInputStream(bytes); MessageChannel channel = new QueueChannel(); ByteStreamSource source = new ByteStreamSource(stream); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.run(); + SourceEndpoint endpoint = new SourceEndpoint(source, channel); + endpoint.invoke(new CommandMessage(new PollCommand())); Message message1 = channel.receive(500); byte[] payload = (byte[]) message1.getPayload(); assertEquals(3, payload.length); @@ -52,74 +52,7 @@ public class ByteStreamSourceTests { assertEquals(3, payload[2]); Message message2 = channel.receive(0); assertNull(message2); - endpoint.run(); - Message message3 = channel.receive(0); - assertNull(message3); - } - - @Test - public void testEndOfStreamWithMaxMessagesPerTask() throws Exception { - byte[] bytes = new byte[] {0,1,2,3,4,5,6,7}; - ByteArrayInputStream stream = new ByteArrayInputStream(bytes); - MessageChannel channel = new QueueChannel(); - ByteStreamSource source = new ByteStreamSource(stream); - source.setBytesPerMessage(8); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - Message message1 = channel.receive(500); - assertEquals(8, ((byte[]) message1.getPayload()).length); - Message message2 = channel.receive(0); - assertNull(message2); - } - - @Test - public void testMultipleMessagesWithSingleMessagePerTask() { - byte[] bytes = new byte[] {0,1,2,3,4,5,6,7}; - ByteArrayInputStream stream = new ByteArrayInputStream(bytes); - MessageChannel channel = new QueueChannel(); - ByteStreamSource source = new ByteStreamSource(stream); - source.setBytesPerMessage(4); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setMaxMessagesPerTask(1); - endpoint.run(); - Message message1 = channel.receive(0); - byte[] bytes1 = (byte[]) message1.getPayload(); - assertEquals(4, bytes1.length); - assertEquals(0, bytes1[0]); - Message message2 = channel.receive(0); - assertNull(message2); - endpoint.run(); - Message message3 = channel.receive(0); - byte[] bytes3 = (byte[]) message3.getPayload(); - assertEquals(4, bytes3.length); - assertEquals(4, bytes3[0]); - } - - @Test - public void testLessThanMaxMessagesAvailable() { - byte[] bytes = new byte[] {0,1,2,3,4,5,6,7}; - ByteArrayInputStream stream = new ByteArrayInputStream(bytes); - MessageChannel channel = new QueueChannel(); - ByteStreamSource source = new ByteStreamSource(stream); - source.setBytesPerMessage(4); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - Message message1 = channel.receive(0); - byte[] bytes1 = (byte[]) message1.getPayload(); - assertEquals(4, bytes1.length); - assertEquals(0, bytes1[0]); - Message message2 = channel.receive(0); - byte[] bytes2 = (byte[]) message2.getPayload(); - assertEquals(4, bytes2.length); - assertEquals(4, bytes2[0]); + endpoint.invoke(new CommandMessage(new PollCommand())); Message message3 = channel.receive(0); assertNull(message3); } @@ -133,14 +66,13 @@ public class ByteStreamSourceTests { source.setBytesPerMessage(4); PollingSchedule schedule = new PollingSchedule(1000); schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setMaxMessagesPerTask(1); - endpoint.run(); + SourceEndpoint endpoint = new SourceEndpoint(source, channel); + endpoint.invoke(new CommandMessage(new PollCommand())); Message message1 = channel.receive(0); assertEquals(4, ((byte[]) message1.getPayload()).length); Message message2 = channel.receive(0); assertNull(message2); - endpoint.run(); + endpoint.invoke(new CommandMessage(new PollCommand())); Message message3 = channel.receive(0); assertEquals(2, ((byte[]) message3.getPayload()).length); } @@ -155,14 +87,13 @@ public class ByteStreamSourceTests { source.setShouldTruncate(false); PollingSchedule schedule = new PollingSchedule(1000); schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setMaxMessagesPerTask(1); - endpoint.run(); + SourceEndpoint endpoint = new SourceEndpoint(source, channel); + endpoint.invoke(new CommandMessage(new PollCommand())); Message message1 = channel.receive(0); assertEquals(4, ((byte[]) message1.getPayload()).length); Message message2 = channel.receive(0); assertNull(message2); - endpoint.run(); + endpoint.invoke(new CommandMessage(new PollCommand())); Message message3 = channel.receive(0); assertEquals(4, ((byte[]) message3.getPayload()).length); assertEquals(0, ((byte[]) message3.getPayload())[3]); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java index 3a87048f6a..2fc5768574 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java @@ -25,7 +25,6 @@ import org.junit.Test; import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.dispatcher.DefaultPollingDispatcher; import org.springframework.integration.dispatcher.PollingDispatcherTask; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.StringMessage; @@ -64,8 +63,8 @@ public class ByteStreamTargetTests { DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); dispatcherPolicy.setMaxMessagesPerTask(3); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); channel.send(new GenericMessage(new byte[] {7,8,9}), 0); @@ -83,8 +82,8 @@ public class ByteStreamTargetTests { DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); dispatcherPolicy.setMaxMessagesPerTask(2); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); channel.send(new GenericMessage(new byte[] {7,8,9}), 0); @@ -102,8 +101,8 @@ public class ByteStreamTargetTests { dispatcherPolicy.setMaxMessagesPerTask(5); dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); channel.send(new GenericMessage(new byte[] {7,8,9}), 0); @@ -121,8 +120,8 @@ public class ByteStreamTargetTests { dispatcherPolicy.setMaxMessagesPerTask(2); dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); channel.send(new GenericMessage(new byte[] {7,8,9}), 0); @@ -145,8 +144,8 @@ public class ByteStreamTargetTests { dispatcherPolicy.setMaxMessagesPerTask(5); dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); channel.send(new GenericMessage(new byte[] {7,8,9}), 0); @@ -168,8 +167,8 @@ public class ByteStreamTargetTests { dispatcherPolicy.setMaxMessagesPerTask(2); dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); channel.send(new GenericMessage(new byte[] {7,8,9}), 0); @@ -191,8 +190,8 @@ public class ByteStreamTargetTests { dispatcherPolicy.setMaxMessagesPerTask(2); dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); channel.send(new GenericMessage(new byte[] {7,8,9}), 0); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java index 825128fef2..3593ebfe8d 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java @@ -26,7 +26,9 @@ import org.junit.Test; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.SourceEndpoint; +import org.springframework.integration.message.CommandMessage; import org.springframework.integration.message.Message; +import org.springframework.integration.message.PollCommand; import org.springframework.integration.scheduling.PollingSchedule; /** @@ -41,68 +43,13 @@ public class CharacterStreamSourceTests { CharacterStreamSource source = new CharacterStreamSource(reader); PollingSchedule schedule = new PollingSchedule(1000); schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.run(); + SourceEndpoint endpoint = new SourceEndpoint(source, channel); + endpoint.invoke(new CommandMessage(new PollCommand())); Message message1 = channel.receive(0); assertEquals("test", message1.getPayload()); Message message2 = channel.receive(0); assertNull(message2); - endpoint.run(); - Message message3 = channel.receive(0); - assertNull(message3); - } - - @Test - public void testEndOfStreamWithMaxMessagesPerTask() { - StringReader reader = new StringReader("test"); - MessageChannel channel = new QueueChannel(); - CharacterStreamSource source = new CharacterStreamSource(reader); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - Message message1 = channel.receive(0); - assertEquals("test", message1.getPayload()); - Message message2 = channel.receive(0); - assertNull(message2); - } - - @Test - public void testMultipleLinesWithSingleMessagePerTask() { - String s = "test1" + System.getProperty("line.separator") + "test2"; - StringReader reader = new StringReader(s); - MessageChannel channel = new QueueChannel(); - CharacterStreamSource source = new CharacterStreamSource(reader); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setMaxMessagesPerTask(1); - endpoint.run(); - Message message1 = channel.receive(0); - assertEquals("test1", message1.getPayload()); - Message message2 = channel.receive(0); - assertNull(message2); - endpoint.run(); - Message message3 = channel.receive(0); - assertEquals("test2", message3.getPayload()); - } - - @Test - public void testLessThanMaxMessagesAvailable() { - String s = "test1" + System.getProperty("line.separator") + "test2"; - StringReader reader = new StringReader(s); - MessageChannel channel = new QueueChannel(); - CharacterStreamSource source = new CharacterStreamSource(reader); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(5000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - Message message1 = channel.receive(500); - assertEquals("test1", message1.getPayload()); - Message message2 = channel.receive(500); - assertEquals("test2", message2.getPayload()); + endpoint.invoke(new CommandMessage(new PollCommand())); Message message3 = channel.receive(0); assertNull(message3); } diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java index 63a5c817b8..4ec1f806b4 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java @@ -25,7 +25,6 @@ import org.junit.Test; import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.dispatcher.DefaultPollingDispatcher; import org.springframework.integration.dispatcher.PollingDispatcherTask; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.StringMessage; @@ -48,8 +47,8 @@ public class CharacterStreamTargetTests { MessageChannel channel = new QueueChannel(); StringWriter writer = new StringWriter(); CharacterStreamTarget target = new CharacterStreamTarget(writer); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new StringMessage("foo"), 0); channel.send(new StringMessage("bar"), 0); task.run(); @@ -64,8 +63,8 @@ public class CharacterStreamTargetTests { StringWriter writer = new StringWriter(); CharacterStreamTarget target = new CharacterStreamTarget(writer); target.setShouldAppendNewLine(true); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new StringMessage("foo"), 0); channel.send(new StringMessage("bar"), 0); task.run(); @@ -82,8 +81,8 @@ public class CharacterStreamTargetTests { DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); dispatcherPolicy.setMaxMessagesPerTask(2); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); channel.send(new StringMessage("foo"), 0); channel.send(new StringMessage("bar"), 0); task.run(); @@ -98,8 +97,8 @@ public class CharacterStreamTargetTests { dispatcherPolicy.setMaxMessagesPerTask(10); dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); target.setShouldAppendNewLine(true); channel.send(new StringMessage("foo"), 0); channel.send(new StringMessage("bar"), 0); @@ -113,8 +112,8 @@ public class CharacterStreamTargetTests { MessageChannel channel = new QueueChannel(); StringWriter writer = new StringWriter(); CharacterStreamTarget target = new CharacterStreamTarget(writer); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); TestObject testObject = new TestObject("foo"); channel.send(new GenericMessage(testObject)); task.run(); @@ -129,8 +128,8 @@ public class CharacterStreamTargetTests { dispatcherPolicy.setReceiveTimeout(0); dispatcherPolicy.setMaxMessagesPerTask(2); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); TestObject testObject1 = new TestObject("foo"); TestObject testObject2 = new TestObject("bar"); channel.send(new GenericMessage(testObject1), 0); @@ -148,8 +147,8 @@ public class CharacterStreamTargetTests { dispatcherPolicy.setMaxMessagesPerTask(2); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); target.setShouldAppendNewLine(true); - PollingDispatcherTask task = new PollingDispatcherTask(new DefaultPollingDispatcher(channel), null); - task.getDispatcher().subscribe(target); + PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.subscribe(target); TestObject testObject1 = new TestObject("foo"); TestObject testObject2 = new TestObject("bar"); channel.send(new GenericMessage(testObject1), 0); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java index 0b4824bfc0..3b551d0e5d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -55,8 +56,11 @@ import org.springframework.integration.endpoint.MessagingGateway; import org.springframework.integration.endpoint.SourceEndpoint; import org.springframework.integration.endpoint.TargetEndpoint; import org.springframework.integration.handler.MessageHandler; +import org.springframework.integration.message.CommandMessage; +import org.springframework.integration.message.PollCommand; import org.springframework.integration.message.Target; import org.springframework.integration.scheduling.MessagePublishingErrorHandler; +import org.springframework.integration.scheduling.MessagingTask; import org.springframework.integration.scheduling.MessagingTaskScheduler; import org.springframework.integration.scheduling.Schedule; import org.springframework.integration.scheduling.SimpleMessagingTaskScheduler; @@ -408,20 +412,27 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio } } - private void registerSourceEndpoint(String name, SourceEndpoint endpoint) { + private void registerSourceEndpoint(String name, final SourceEndpoint endpoint) { if (!this.initialized) { this.initialize(); } - this.taskScheduler.schedule(endpoint); + final Schedule schedule = endpoint.getSchedule(); + if (schedule != null) { + this.taskScheduler.schedule(new MessagingTask() { + public Schedule getSchedule() { + return schedule; + } + public void run() { + endpoint.invoke(new CommandMessage(new PollCommand())); + } + }); + } if (endpoint instanceof Lifecycle) { this.lifecycleEndpoints.add((Lifecycle) endpoint); if (this.isRunning()) { ((Lifecycle) endpoint).start(); } } - if (logger.isInfoEnabled()) { - logger.info("registered source adapter '" + name + "'"); - } } private void registerGateway(String name, MessagingGateway gateway) { @@ -436,7 +447,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio } } - private void activateSubscription(MessageChannel channel, Target target, Schedule schedule) { + private void activateSubscription(MessageChannel channel, TargetEndpoint targetEndpoint, Schedule schedule) { SubscriptionManager manager = this.subscriptionManagers.get(channel); if (manager == null) { if (logger.isWarnEnabled()) { @@ -445,7 +456,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio } return; } - manager.addTarget(target, schedule); + manager.addTarget(targetEndpoint, schedule); if (this.isRunning() && !manager.isRunning()) { manager.start(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/SubscriptionManager.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/SubscriptionManager.java index 924abaab71..e02309b274 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/SubscriptionManager.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/SubscriptionManager.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import org.springframework.context.Lifecycle; import org.springframework.integration.ConfigurationException; import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.dispatcher.DefaultPollingDispatcher; import org.springframework.integration.dispatcher.DirectChannel; import org.springframework.integration.dispatcher.PollingDispatcherTask; import org.springframework.integration.endpoint.TargetEndpoint; @@ -122,10 +121,9 @@ public class SubscriptionManager { } PollingDispatcherTask dispatcherTask = this.dispatcherTasks.get(schedule); if (dispatcherTask == null) { - DefaultPollingDispatcher dispatcher = new DefaultPollingDispatcher(this.channel); - dispatcherTask = this.dispatcherTasks.putIfAbsent(schedule, new PollingDispatcherTask(dispatcher, schedule)); + dispatcherTask = this.dispatcherTasks.putIfAbsent(schedule, new PollingDispatcherTask(this.channel, schedule)); } - this.dispatcherTasks.get(schedule).getDispatcher().subscribe(target); + this.dispatcherTasks.get(schedule).subscribe(target); if (dispatcherTask == null && this.isRunning()) { this.scheduleDispatcherTask(schedule); } @@ -135,7 +133,7 @@ public class SubscriptionManager { boolean removed = false; Collection dispatcherTaskValues = this.dispatcherTasks.values(); for (PollingDispatcherTask dispatcherTask : dispatcherTaskValues) { - removed = (removed || dispatcherTask.getDispatcher().unsubscribe(target)); + removed = (removed || dispatcherTask.unsubscribe(target)); } return removed; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractTargetEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractTargetEndpointParser.java index 1b9d8d83a0..98ed241a03 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractTargetEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractTargetEndpointParser.java @@ -24,6 +24,7 @@ import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; @@ -61,6 +62,8 @@ public abstract class AbstractTargetEndpointParser extends AbstractSingleBeanDef private static final String CONCURRENCY_POLICY_PROPERTY = "concurrencyPolicy"; + private static final String ADVICE_CHAIN_ELEMENT = "advice-chain"; + @Override protected boolean shouldGenerateId() { @@ -98,6 +101,10 @@ public abstract class AbstractTargetEndpointParser extends AbstractSingleBeanDef else if (SCHEDULE_ELEMENT.equals(localName)) { schedule = this.parseSchedule((Element) child); } + else if (ADVICE_CHAIN_ELEMENT.equals(localName)) { + ManagedList adviceChain = IntegrationNamespaceUtils.parseEndpointAdviceChain((Element) child); + builder.addPropertyValue("adviceChain", adviceChain); + } } } if (StringUtils.hasText(inputChannel)) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java index 2ddd6dc7d7..d24b79c7b1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java @@ -17,8 +17,11 @@ package org.springframework.integration.config; import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; import org.springframework.beans.factory.config.RuntimeBeanReference; +import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.integration.endpoint.ConcurrencyPolicy; import org.springframework.util.StringUtils; @@ -61,6 +64,23 @@ public abstract class IntegrationNamespaceUtils { return policy; } + @SuppressWarnings("unchecked") + public static ManagedList parseEndpointAdviceChain(Element element) { + ManagedList adviceChain = new ManagedList(); + NodeList childNodes = element.getChildNodes(); + for (int i = 0; i < childNodes.getLength(); i++) { + Node child = childNodes.item(i); + if (child.getNodeType() == Node.ELEMENT_NODE) { + String localName = child.getLocalName(); + if ("ref".equals(localName)) { + String ref = ((Element) child).getAttribute("bean"); + adviceChain.add(new RuntimeBeanReference(ref)); + } + } + } + return adviceChain; + } + /** * Populates the property identified by propertyName on the bean definition * to the value of the attribute specified by attributeName, if that diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java index 6ecfe6f874..94ea283907 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java @@ -16,13 +16,17 @@ package org.springframework.integration.config; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + import org.springframework.beans.factory.BeanDefinitionStoreException; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.ManagedList; -import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.core.Conventions; @@ -30,9 +34,6 @@ import org.springframework.integration.ConfigurationException; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.bus.MessageBusAwareBeanPostProcessor; import org.springframework.util.StringUtils; -import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; /** * Parser for the message-bus element of the integration namespace. @@ -100,6 +101,7 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { this.processChildElements(beanDefinition, element); } + @SuppressWarnings("unchecked") private void processChildElements(BeanDefinitionBuilder beanDefinition, Element element) { NodeList childNodes = element.getChildNodes(); ManagedList interceptors = new ManagedList(); @@ -131,11 +133,21 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { * Adds extra post-processors to the context, to inject the objects configured by the MessageBus */ private void addPostProcessors(ParserContext parserContext) { - BeanDefinition postProcessorDefinition = new RootBeanDefinition(MessageBusAwareBeanPostProcessor.class); - postProcessorDefinition.getConstructorArgumentValues().addGenericArgumentValue( - new RuntimeBeanReference(MessageBusParser.MESSAGE_BUS_BEAN_NAME)); - parserContext.getRegistry().registerBeanDefinition(MESSAGE_BUS_AWARE_POST_PROCESSOR_BEAN_NAME, - postProcessorDefinition); + this.registerMessageBusAwarePostProcessor(parserContext); + this.registerMessageEndpointPostProcessor(parserContext); + } + + private void registerMessageBusAwarePostProcessor(ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MessageBusAwareBeanPostProcessor.class); + builder.addConstructorArgReference(MessageBusParser.MESSAGE_BUS_BEAN_NAME); + builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); + parserContext.getRegistry().registerBeanDefinition(MESSAGE_BUS_AWARE_POST_PROCESSOR_BEAN_NAME, builder.getBeanDefinition()); + } + + private void registerMessageEndpointPostProcessor(ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MessageEndpointBeanPostProcessor.class); + builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); + BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java new file mode 100644 index 0000000000..bb475f1bec --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java @@ -0,0 +1,75 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import java.lang.reflect.Method; +import java.util.List; + +import org.aopalliance.aop.Advice; + +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.message.Message; + +/** + * A post-processor that applies an advice-chain by creating a proxy for an endpoint. + * + * @author Mark Fisher + */ +public class MessageEndpointBeanPostProcessor implements BeanPostProcessor { + + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + return bean; + } + + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof AbstractEndpoint) { + AbstractEndpoint endpoint = (AbstractEndpoint) bean; + List adviceChain = endpoint.getAdviceChain(); + if (adviceChain.size() > 0) { + ProxyFactory proxyFactory = new ProxyFactory(endpoint); + for (Advice advice : adviceChain) { + proxyFactory.addAdvisor(new EndpointInvokeMethodAdvisor(advice)); + } + bean = proxyFactory.getProxy(); + } + } + return bean; + } + + + @SuppressWarnings("serial") + private static class EndpointInvokeMethodAdvisor extends StaticMethodMatcherPointcutAdvisor { + + EndpointInvokeMethodAdvisor(Advice advice) { + super(advice); + } + + + @SuppressWarnings("unchecked") + public boolean matches(Method method, Class clazz) { + return method.getName().equals("invoke") + && method.getParameterTypes().length == 1 + && method.getParameterTypes()[0].equals(Message.class); + } + + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/SourceEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/SourceEndpointParser.java index 3854351a28..e9cce14b44 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/SourceEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/SourceEndpointParser.java @@ -19,6 +19,7 @@ package org.springframework.integration.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.ConfigurationException; @@ -67,7 +68,12 @@ public class SourceEndpointParser extends AbstractSimpleBeanDefinitionParser { if (scheduleElement == null) { throw new ConfigurationException("The sub-element is required for a ."); } - builder.addConstructorArgValue(this.parseSchedule(scheduleElement)); + builder.addPropertyValue("schedule", this.parseSchedule(scheduleElement)); + Element adviceChainElement = DomUtils.getChildElementByTagName(element, "advice-chain"); + if (adviceChainElement != null) { + ManagedList adviceChain = IntegrationNamespaceUtils.parseEndpointAdviceChain(adviceChainElement); + builder.addPropertyValue("adviceChain", adviceChain); + } } /** diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SourceAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SourceAnnotationPostProcessor.java index 4227731e27..fcf885d14a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SourceAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SourceAnnotationPostProcessor.java @@ -76,7 +76,9 @@ public class SourceAnnotationPostProcessor extends AbstractAnnotationMethodPostP outputChannel = new DirectChannel(); this.getMessageBus().registerChannel(beanName + ".output", outputChannel); } - return new SourceEndpoint((Source) bean, outputChannel, schedule); + SourceEndpoint endpoint = new SourceEndpoint((Source) bean, outputChannel); + endpoint.setSchedule(schedule); + return endpoint; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index 6719c99613..eba8db4e30 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -166,6 +166,7 @@ + @@ -415,6 +416,7 @@ + @@ -437,4 +439,19 @@ + + + + Defines a list of Advice. + + + + + + + + + + + \ No newline at end of file diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DefaultPollingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DefaultPollingDispatcher.java deleted file mode 100644 index 60cc4e639e..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DefaultPollingDispatcher.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.dispatcher; - -import java.util.LinkedList; -import java.util.List; - -import org.springframework.integration.channel.DispatcherPolicy; -import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.message.BlockingSource; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageDeliveryAware; -import org.springframework.integration.message.MessageDeliveryException; -import org.springframework.integration.message.Source; -import org.springframework.util.Assert; - -/** - * A subclass of {@link SimpleDispatcher} that adds message retrieval - * capabilities by polling a {@link PollableSource} for {@link Message Messages}. - * The number of messages retrieved per poll is limited by the 'maxMessagesPerTask' - * property of the provided {@link DispatcherPolicy}, and the timeout for each - * receive call is determined by the policy's 'receiveTimeout' - * property. In general, it is recommended to use a value of 1 (the default) for - * 'maxMessagesPerTask' whenever a significant timeout is provided. Otherwise - * the poller may be holding on to available messages while waiting for - * additional messages. Note that the 'timeout' value is only relevant if the - * specified source is an implementation of {@link BlockingSource}. The default - * timeout value is 0 indicating that the method should return immediately - * rather than waiting for a {@link Message} to become available. - * - * @author Mark Fisher - */ -public class DefaultPollingDispatcher extends SimpleDispatcher implements PollingDispatcher { - - private final Source source; - - - public DefaultPollingDispatcher(MessageChannel channel) { - this(channel, channel.getDispatcherPolicy()); - } - - public DefaultPollingDispatcher(Source source, DispatcherPolicy dispatcherPolicy) { - super(dispatcherPolicy); - Assert.notNull(source, "source must not be null"); - this.source = source; - this.dispatcherPolicy.setReceiveTimeout(0); - } - - @Override - public boolean dispatch(Message message) { - boolean sent = super.dispatch(message); - if (this.source instanceof MessageDeliveryAware) { - if (sent) { - ((MessageDeliveryAware) this.source).onSend(message); - } - else { - ((MessageDeliveryAware) this.source).onFailure(new MessageDeliveryException(message, "failed to send message")); - } - } - return sent; - } - - public List> poll() { - List> messages = new LinkedList>(); - int limit = this.dispatcherPolicy.getMaxMessagesPerTask(); - while (messages.size() < limit) { - Message message = null; - long timeout = this.dispatcherPolicy.getReceiveTimeout(); - if (this.source instanceof BlockingSource && timeout >= 0) { - message = ((BlockingSource) this.source).receive(timeout); - } - else { - message = this.source.receive(); - } - if (message == null) { - return messages; - } - messages.add(message); - } - return messages; - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcherTask.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcherTask.java index 798c5cc140..fe9adccb80 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcherTask.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcherTask.java @@ -16,9 +16,10 @@ package org.springframework.integration.dispatcher; -import java.util.List; - +import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.message.Message; +import org.springframework.integration.message.Subscribable; +import org.springframework.integration.message.Target; import org.springframework.integration.scheduling.MessagingTask; import org.springframework.integration.scheduling.Schedule; import org.springframework.util.Assert; @@ -28,22 +29,29 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class PollingDispatcherTask implements MessagingTask { +public class PollingDispatcherTask implements MessagingTask, Subscribable { - private final PollingDispatcher dispatcher; + private final MessageChannel channel; private final Schedule schedule; + private final SimpleDispatcher dispatcher; - public PollingDispatcherTask(PollingDispatcher dispatcher, Schedule schedule) { - Assert.notNull(dispatcher, "dispatcher must not be null"); - this.dispatcher = dispatcher; + + public PollingDispatcherTask(MessageChannel channel, Schedule schedule) { + Assert.notNull(channel, "channel must not be null"); + this.channel = channel; this.schedule = schedule; + this.dispatcher = new SimpleDispatcher(this.channel.getDispatcherPolicy()); } - public PollingDispatcher getDispatcher() { - return this.dispatcher; + public boolean subscribe(Target target) { + return this.dispatcher.subscribe(target); + } + + public boolean unsubscribe(Target target) { + return this.dispatcher.unsubscribe(target); } public Schedule getSchedule() { @@ -51,9 +59,16 @@ public class PollingDispatcherTask implements MessagingTask { } public void run() { - List> messages = this.dispatcher.poll(); - for (Message message : messages) { + long timeout = this.channel.getDispatcherPolicy().getReceiveTimeout(); + int limit = this.channel.getDispatcherPolicy().getMaxMessagesPerTask(); + int count = 0; + while (count < limit) { + Message message = (timeout < 0) ? this.channel.receive() : this.channel.receive(timeout); + if (message == null) { + return; + } this.dispatcher.dispatch(message); + count++; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index a34d7a9aaf..d3406a997e 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -16,20 +16,32 @@ package org.springframework.integration.endpoint; +import java.util.ArrayList; +import java.util.List; + +import org.aopalliance.aop.Advice; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.integration.ConfigurationException; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageHandlingException; + /** * Base class for {@link MessageEndpoint} implementations. * * @author Mark Fisher */ -public abstract class AbstractEndpoint implements MessageEndpoint { +public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware { protected final Log logger = LogFactory.getLog(this.getClass()); private volatile String name; + private final List adviceChain = new ArrayList(); + public String getName() { return this.name; @@ -47,4 +59,37 @@ public abstract class AbstractEndpoint implements MessageEndpoint { return (this.name != null) ? this.name : super.toString(); } + public void setAdviceChain(List adviceChain) { + for (Object advice : adviceChain) { + if (advice instanceof Advice) { + this.adviceChain.add((Advice) advice); + } + else if (advice instanceof EndpointInterceptor) { + this.adviceChain.add(new EndpointMethodInterceptor((EndpointInterceptor) advice)); + } + else { + throw new ConfigurationException("Each adviceChain element must implement either " + + "'" + Advice.class.getName() + "' or '" + EndpointInterceptor.class.getName() + "'."); + } + } + } + + public List getAdviceChain() { + return this.adviceChain; + } + + public final boolean invoke(Message message) { + if (message == null) { + throw new IllegalArgumentException("Message must not be null."); + } + if (!this.supports(message)) { + throw new MessageHandlingException(message, "unsupported message"); + } + return this.doInvoke(message); + } + + protected abstract boolean supports(Message message); + + protected abstract boolean doInvoke(Message message); + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptor.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptor.java new file mode 100644 index 0000000000..3a0283a705 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptor.java @@ -0,0 +1,34 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import org.aopalliance.intercept.MethodInvocation; + +import org.springframework.integration.message.Message; + +/** + * @author Mark Fisher + */ +public interface EndpointInterceptor { + + boolean preInvoke(Message message); + + boolean aroundInvoke(MethodInvocation invocation) throws Throwable; + + void postInvoke(Message message, boolean result); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptorAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptorAdapter.java new file mode 100644 index 0000000000..db02397dd0 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptorAdapter.java @@ -0,0 +1,39 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import org.aopalliance.intercept.MethodInvocation; + +import org.springframework.integration.message.Message; + +/** + * @author Mark Fisher + */ +public class EndpointInterceptorAdapter implements EndpointInterceptor { + + public boolean preInvoke(Message message) { + return true; + } + + public boolean aroundInvoke(MethodInvocation invocation) throws Throwable { + return (Boolean) invocation.proceed(); + } + + public void postInvoke(Message message, boolean result) { + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointMethodInterceptor.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointMethodInterceptor.java new file mode 100644 index 0000000000..bcdba10c74 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointMethodInterceptor.java @@ -0,0 +1,56 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; + +import org.springframework.integration.message.Message; +import org.springframework.util.Assert; + +/** + * @author Mark Fisher + */ +public class EndpointMethodInterceptor implements MethodInterceptor { + + private final EndpointInterceptor interceptor; + + + public EndpointMethodInterceptor(EndpointInterceptor interceptor) { + Assert.notNull(interceptor, "EndpointInterceptor must not be null."); + this.interceptor = interceptor; + } + + + public Object invoke(MethodInvocation invocation) throws Throwable { + Message message = null; + try { + message = (Message) invocation.getArguments()[0]; + } + catch (Exception e) { + throw new IllegalStateException("EndpointMethodInterceptor is only applicable for the " + + "'MessageEndpoint.invoke(Message message)' method."); + } + if (!this.interceptor.preInvoke(message)) { + return Boolean.FALSE; + } + boolean returnValue = this.interceptor.aroundInvoke(invocation); + this.interceptor.postInvoke(message, returnValue); + return returnValue; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java index c26f7b647f..b7abe24fdf 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java @@ -16,15 +16,17 @@ package org.springframework.integration.endpoint; -import org.springframework.beans.factory.BeanNameAware; +import org.springframework.integration.message.Message; /** * Base interface for message endpoints. * * @author Mark Fisher */ -public interface MessageEndpoint extends BeanNameAware { +public interface MessageEndpoint { String getName(); + boolean invoke(Message message); + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java index 2a1f069fbc..c2a83fce9f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java @@ -16,21 +16,14 @@ package org.springframework.integration.endpoint; -import java.lang.reflect.Method; -import java.util.List; - -import org.aopalliance.aop.Advice; - -import org.springframework.aop.framework.ProxyFactory; -import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor; -import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.dispatcher.DefaultPollingDispatcher; -import org.springframework.integration.dispatcher.PollingDispatcher; -import org.springframework.integration.dispatcher.PollingDispatcherTask; +import org.springframework.integration.dispatcher.SimpleDispatcher; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageDeliveryAware; +import org.springframework.integration.message.MessageDeliveryException; +import org.springframework.integration.message.PollCommand; import org.springframework.integration.message.Source; -import org.springframework.integration.scheduling.MessagingTask; import org.springframework.integration.scheduling.Schedule; import org.springframework.util.Assert; @@ -40,120 +33,50 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class SourceEndpoint extends AbstractEndpoint implements MessagingTask, InitializingBean { +public class SourceEndpoint extends AbstractEndpoint { - private final Schedule schedule; + private final Source source; - private final DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); + private final SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherPolicy()); - private volatile PollingDispatcher dispatcher; - - private volatile List dispatchAdviceChain; - - private volatile MessagingTask task; - - private volatile List taskAdviceChain; - - private volatile boolean taskInitialized; - - private final Object taskMonitor = new Object(); + private volatile Schedule schedule; - public SourceEndpoint(Source source, MessageChannel channel, Schedule schedule) { + public SourceEndpoint(Source source, MessageChannel channel) { Assert.notNull(source, "source must not be null"); Assert.notNull(channel, "channel must not be null"); - Assert.notNull(schedule, "schedule must not be null"); - this.dispatcher = new DefaultPollingDispatcher(source, this.dispatcherPolicy); + this.source = source; this.dispatcher.subscribe(channel); + } + + + public void setSchedule(Schedule schedule) { this.schedule = schedule; } - - public void setMaxMessagesPerTask(int maxMessagesPerTask) { - this.dispatcherPolicy.setMaxMessagesPerTask(maxMessagesPerTask); - } - - public void setSendTimeout(long sendTimeout) { - this.dispatcher.setSendTimeout(sendTimeout); - } - - public void setTaskAdviceChain(List taskAdviceChain) { - this.taskAdviceChain = taskAdviceChain; - } - - public void setDispatchAdviceChain(List dispatchAdviceChain) { - this.dispatchAdviceChain = dispatchAdviceChain; - } - public Schedule getSchedule() { return this.schedule; } - public void afterPropertiesSet() { - this.initializeTask(); + protected boolean supports(Message message) { + return (message.getPayload() instanceof PollCommand); } - public void initializeTask() { - synchronized (this.taskMonitor) { - if (this.taskInitialized) { - return; - } - this.refreshTask(); - this.taskInitialized = true; + public final boolean doInvoke(Message pollCommandMessage) { + Message message = this.source.receive(); + if (message == null) { + return false; } - } - - public void refreshTask() { - synchronized (this.taskMonitor) { - PollingDispatcher dispatcherProxy = null; - if (this.dispatchAdviceChain != null && this.dispatchAdviceChain.size() > 0) { - ProxyFactory proxyFactory = new ProxyFactory(this.dispatcher); - proxyFactory.setInterfaces(new Class[] { PollingDispatcher.class }); - for (Advice advice : this.dispatchAdviceChain) { - proxyFactory.addAdvisor(new MethodNameAdvisor(advice, "dispatch")); - } - dispatcherProxy = (PollingDispatcher) proxyFactory.getProxy(); + boolean sent = this.dispatcher.dispatch(message); + if (this.source instanceof MessageDeliveryAware) { + if (sent) { + ((MessageDeliveryAware) this.source).onSend(message); } - this.task = new PollingDispatcherTask((dispatcherProxy != null) ? dispatcherProxy : this.dispatcher, this.schedule); - if (this.taskAdviceChain != null && this.taskAdviceChain.size() > 0) { - ProxyFactory proxyFactory = new ProxyFactory(this.task); - proxyFactory.setInterfaces(new Class[] { MessagingTask.class }); - for (Advice advice : this.taskAdviceChain) { - proxyFactory.addAdvisor(new MethodNameAdvisor(advice, "run")); - } - this.task = (MessagingTask) proxyFactory.getProxy(); + else { + ((MessageDeliveryAware) this.source).onFailure(new MessageDeliveryException(message, "failed to send message")); } } - } - - private MessagingTask getTask() { - synchronized (this.taskMonitor) { - if (!this.taskInitialized) { - this.initializeTask(); - } - return this.task; - } - } - - public void run() { - this.getTask().run(); - } - - - @SuppressWarnings("serial") - private static class MethodNameAdvisor extends StaticMethodMatcherPointcutAdvisor { - - private final String methodName; - - MethodNameAdvisor(Advice advice, String methodName) { - super(advice); - this.methodName = methodName; - } - - @SuppressWarnings("unchecked") - public boolean matches(Method method, Class targetClass) { - return method.getName().equals(methodName); - } + return sent; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java index 3d97f44276..6d13fe45fe 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java @@ -169,7 +169,8 @@ public class TargetEndpoint extends AbstractEndpoint implements Target, ChannelR this.running = false; } - public final boolean send(Message message) { + @Override + protected final boolean doInvoke(Message message) { if (logger.isDebugEnabled()) { logger.debug("endpoint '" + this + "' handling message: " + message); } @@ -198,4 +199,13 @@ public class TargetEndpoint extends AbstractEndpoint implements Target, ChannelR } } + @Override + protected final boolean supports(Message message) { + return true; + } + + public boolean send(Message message) { + return this.invoke(message); + } + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/Poller.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/Command.java similarity index 90% rename from org.springframework.integration/src/main/java/org/springframework/integration/message/Poller.java rename to org.springframework.integration/src/main/java/org/springframework/integration/message/Command.java index 5c7dc44d45..ec9cbc3b3c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/Poller.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/Command.java @@ -16,13 +16,11 @@ package org.springframework.integration.message; -import java.util.List; - /** + * A marker interface for commands. + * * @author Mark Fisher */ -public interface Poller { - - List> poll(); +public interface Command { } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/CommandMessage.java similarity index 72% rename from org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java rename to org.springframework.integration/src/main/java/org/springframework/integration/message/CommandMessage.java index 0a3cbf8abd..a09a0b20ce 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/CommandMessage.java @@ -14,14 +14,15 @@ * limitations under the License. */ -package org.springframework.integration.dispatcher; - -import org.springframework.integration.message.Poller; -import org.springframework.integration.message.Subscribable; +package org.springframework.integration.message; /** * @author Mark Fisher */ -public interface PollingDispatcher extends Poller, MessageDispatcher, Subscribable { +public class CommandMessage extends GenericMessage { + + public CommandMessage(Command command) { + super(command); + } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/PollCommand.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/PollCommand.java new file mode 100644 index 0000000000..edd2938a57 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/PollCommand.java @@ -0,0 +1,24 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.message; + +/** + * @author Mark Fisher + */ +public class PollCommand implements Command { + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusTests.java index c2da1503a6..1baa252635 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusTests.java @@ -172,7 +172,8 @@ public class MessageBusTests { public void testErrorChannelWithFailedDispatch() throws InterruptedException { MessageBus bus = new MessageBus(); CountDownLatch latch = new CountDownLatch(1); - SourceEndpoint sourceEndpoint = new SourceEndpoint(new FailingSource(latch), new QueueChannel(), new PollingSchedule(1000)); + SourceEndpoint sourceEndpoint = new SourceEndpoint(new FailingSource(latch), new QueueChannel()); + sourceEndpoint.setSchedule(new PollingSchedule(1000)); bus.registerEndpoint("testEndpoint", sourceEndpoint); bus.start(); latch.await(2000, TimeUnit.MILLISECONDS); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageEndpointBeanPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageEndpointBeanPostProcessorTests.java new file mode 100644 index 0000000000..b970c65af6 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageEndpointBeanPostProcessorTests.java @@ -0,0 +1,113 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +import org.springframework.aop.support.AopUtils; +import org.springframework.context.ApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.message.CommandMessage; +import org.springframework.integration.message.PollCommand; +import org.springframework.integration.message.StringMessage; + +/** + * @author Mark Fisher + */ +public class MessageEndpointBeanPostProcessorTests { + + @Test + public void testNoProxyCreatedForHandlerEndpointWithEmptyAdviceChain() { + ApplicationContext context = new ClassPathXmlApplicationContext( + "messageEndpointBeanPostProcessorTests.xml", this.getClass()); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithoutAdvice"); + assertFalse(AopUtils.isAopProxy(endpoint)); + } + + @Test + public void testHandlerEndpointWithAdviceChain() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "messageEndpointBeanPostProcessorTests.xml", this.getClass()); + context.start(); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithAdvice"); + assertTrue(AopUtils.isAopProxy(endpoint)); + TestBeforeAdvice beforeAdvice = (TestBeforeAdvice) context.getBean("simpleAdvice"); + TestEndpointInterceptor interceptor = (TestEndpointInterceptor) context.getBean("interceptor"); + assertEquals(0, beforeAdvice.getCount()); + assertEquals(0, interceptor.getCount()); + endpoint.invoke(new StringMessage("test")); + assertEquals(1, beforeAdvice.getCount()); + assertEquals(2, interceptor.getCount()); + context.stop(); + } + + @Test + public void testNoProxyCreatedForTargetEndpointWithEmptyAdviceChain() { + ApplicationContext context = new ClassPathXmlApplicationContext( + "messageEndpointBeanPostProcessorTests.xml", this.getClass()); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithoutAdvice"); + assertFalse(AopUtils.isAopProxy(endpoint)); + } + + @Test + public void testTargetEndpointWithAdviceChain() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "messageEndpointBeanPostProcessorTests.xml", this.getClass()); + context.start(); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithAdvice"); + assertTrue(AopUtils.isAopProxy(endpoint)); + TestBeforeAdvice beforeAdvice = (TestBeforeAdvice) context.getBean("simpleAdvice"); + TestEndpointInterceptor interceptor = (TestEndpointInterceptor) context.getBean("interceptor"); + assertEquals(0, beforeAdvice.getCount()); + assertEquals(0, interceptor.getCount()); + endpoint.invoke(new StringMessage("test")); + assertEquals(1, beforeAdvice.getCount()); + assertEquals(2, interceptor.getCount()); + context.stop(); + } + + @Test + public void testNoProxyCreatedForSourceEndpointWithEmptyAdviceChain() { + ApplicationContext context = new ClassPathXmlApplicationContext( + "messageEndpointBeanPostProcessorTests.xml", this.getClass()); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithoutAdvice"); + assertFalse(AopUtils.isAopProxy(endpoint)); + } + + @Test + public void testSourceEndpointWithAdviceChain() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "messageEndpointBeanPostProcessorTests.xml", this.getClass()); + context.start(); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithAdvice"); + assertTrue(AopUtils.isAopProxy(endpoint)); + TestBeforeAdvice beforeAdvice = (TestBeforeAdvice) context.getBean("simpleAdvice"); + TestEndpointInterceptor interceptor = (TestEndpointInterceptor) context.getBean("interceptor"); + assertEquals(0, beforeAdvice.getCount()); + assertEquals(0, interceptor.getCount()); + endpoint.invoke(new CommandMessage(new PollCommand())); + assertEquals(1, beforeAdvice.getCount()); + assertEquals(2, interceptor.getCount()); + context.stop(); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestBeforeAdvice.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestBeforeAdvice.java new file mode 100644 index 0000000000..3726b4afbd --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestBeforeAdvice.java @@ -0,0 +1,40 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicInteger; + +import org.springframework.aop.MethodBeforeAdvice; + +/** + * @author Mark Fisher + */ +public class TestBeforeAdvice implements MethodBeforeAdvice { + + private AtomicInteger counter = new AtomicInteger(); + + + public int getCount() { + return this.counter.get(); + } + + public void before(Method method, Object[] args, Object target) throws Throwable { + this.counter.incrementAndGet(); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java new file mode 100644 index 0000000000..de21161317 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java @@ -0,0 +1,44 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.aopalliance.intercept.MethodInvocation; + +import org.springframework.integration.endpoint.EndpointInterceptorAdapter; + +/** + * @author Mark Fisher + */ +public class TestEndpointInterceptor extends EndpointInterceptorAdapter { + + private AtomicInteger counter = new AtomicInteger(); + + + public int getCount() { + return this.counter.get(); + } + + public boolean aroundInvoke(MethodInvocation invocation) throws Throwable { + this.counter.incrementAndGet(); + Boolean result = (Boolean) invocation.proceed(); + this.counter.incrementAndGet(); + return result; + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestSource.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestSource.java new file mode 100644 index 0000000000..3e375a77e4 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestSource.java @@ -0,0 +1,32 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import org.springframework.integration.message.Message; +import org.springframework.integration.message.Source; +import org.springframework.integration.message.StringMessage; + +/** + * @author Mark Fisher + */ +public class TestSource implements Source { + + public Message receive() { + return new StringMessage("test"); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestTarget.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestTarget.java new file mode 100644 index 0000000000..b7b24a8590 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestTarget.java @@ -0,0 +1,31 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import org.springframework.integration.message.Message; +import org.springframework.integration.message.Target; + +/** + * @author Mark Fisher + */ +public class TestTarget implements Target { + + public boolean send(Message message) { + return true; + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/messageEndpointBeanPostProcessorTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/messageEndpointBeanPostProcessorTests.xml new file mode 100644 index 0000000000..62a1833d47 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/messageEndpointBeanPostProcessorTests.xml @@ -0,0 +1,76 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DefaultPollingDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DefaultPollingDispatcherTests.java deleted file mode 100644 index 496eb542d9..0000000000 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DefaultPollingDispatcherTests.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.dispatcher; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.Collection; -import java.util.Iterator; - -import org.junit.Test; - -import org.springframework.integration.channel.DispatcherPolicy; -import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.StringMessage; - -/** - * @author Mark Fisher - */ -public class DefaultPollingDispatcherTests { - - @Test - public void testSingleMessagePerRetrieval() { - DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setReceiveTimeout(0); - MessageChannel channel = new QueueChannel(5, dispatcherPolicy); - DefaultPollingDispatcher dispatcher = new DefaultPollingDispatcher(channel); - Collection> results = dispatcher.poll(); - assertTrue(results.isEmpty()); - channel.send(new StringMessage("test1"), 0); - channel.send(new StringMessage("test2"), 0); - results = dispatcher.poll(); - assertEquals(1, results.size()); - assertEquals("test1", results.iterator().next().getPayload()); - results = dispatcher.poll(); - assertEquals(1, results.size()); - assertEquals("test2", results.iterator().next().getPayload()); - } - - @Test - public void testMultipleMessagesPerRetrieval() { - DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setReceiveTimeout(0); - dispatcherPolicy.setMaxMessagesPerTask(2); - MessageChannel channel = new QueueChannel(5, dispatcherPolicy); - DefaultPollingDispatcher dispatcher = new DefaultPollingDispatcher(channel); - Collection> results = dispatcher.poll(); - assertTrue(results.isEmpty()); - channel.send(new StringMessage("test1"), 0); - channel.send(new StringMessage("test2"), 0); - channel.send(new StringMessage("test3"), 0); - results = dispatcher.poll(); - assertEquals(2, results.size()); - Iterator> iter = results.iterator(); - assertEquals("test1", iter.next().getPayload()); - assertEquals("test2", iter.next().getPayload()); - results = dispatcher.poll(); - assertEquals(1, results.size()); - assertEquals("test3", results.iterator().next().getPayload()); - } - - @Test - public void testMaxMessagesConfiguredDynamically() { - DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setReceiveTimeout(0); - dispatcherPolicy.setMaxMessagesPerTask(1); - MessageChannel channel = new QueueChannel(5, dispatcherPolicy); - DefaultPollingDispatcher dispatcher = new DefaultPollingDispatcher(channel); - Collection> results = dispatcher.poll(); - assertTrue(results.isEmpty()); - channel.send(new StringMessage("test1"), 0); - channel.send(new StringMessage("test2"), 0); - channel.send(new StringMessage("test3"), 0); - results = dispatcher.poll(); - assertEquals(1, results.size()); - assertEquals("test1", results.iterator().next().getPayload()); - dispatcherPolicy.setMaxMessagesPerTask(5); - results = dispatcher.poll(); - assertEquals(2, results.size()); - Iterator> iter = results.iterator(); - assertEquals("test2", iter.next().getPayload()); - assertEquals("test3", iter.next().getPayload()); - } - -} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java index 52f088c09f..a23a191ed4 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java @@ -18,25 +18,17 @@ package org.springframework.integration.endpoint; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import org.aopalliance.aop.Advice; -import org.aopalliance.intercept.MethodInterceptor; -import org.aopalliance.intercept.MethodInvocation; - import org.junit.Test; -import org.springframework.aop.MethodBeforeAdvice; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.message.CommandMessage; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; +import org.springframework.integration.message.PollCommand; import org.springframework.integration.message.Source; -import org.springframework.integration.scheduling.PollingSchedule; /** * @author Mark Fisher @@ -47,387 +39,13 @@ public class SourceEndpointTests { public void testPolledSourceSendsToChannel() { TestSource source = new TestSource("testing", 1); QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(100); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.run(); + SourceEndpoint endpoint = new SourceEndpoint(source, channel); + endpoint.invoke(new CommandMessage(new PollCommand())); Message message = channel.receive(1000); assertNotNull("message should not be null", message); assertEquals("testing.1", message.getPayload()); } - @Test - public void testSendTimeout() { - TestSource source = new TestSource("testing", 1); - QueueChannel channel = new QueueChannel(1); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setSendTimeout(10); - endpoint.run(); - Message message1 = channel.receive(1000); - assertNotNull("message should not be null", message1); - assertEquals("testing.1", message1.getPayload()); - Message message2 = channel.receive(0); - assertNull("second message should be null", message2); - source.resetCounter(); - endpoint.run(); - Message message3 = channel.receive(100); - assertNotNull("third message should not be null", message3); - assertEquals("testing.1", message3.getPayload()); - } - - @Test - public void testMultipleMessagesPerPoll() { - TestSource source = new TestSource("testing", 3); - QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - Message message1 = channel.receive(0); - assertNotNull("message should not be null", message1); - assertEquals("testing.1", message1.getPayload()); - Message message2 = channel.receive(0); - assertNotNull("message should not be null", message2); - assertEquals("testing.2", message2.getPayload()); - Message message3 = channel.receive(0); - assertNotNull("message should not be null", message3); - assertEquals("testing.3", message3.getPayload()); - Message message4 = channel.receive(0); - assertNull("message should be null", message4); - } - - @Test - public void testTaskAdviceChain() { - TestSource source = new TestSource("testing", 3); - QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - final StringBuffer buffer = new StringBuffer(); - List taskAdviceChain = new ArrayList(); - taskAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append(1); - } - }); - taskAdviceChain.add(new MethodInterceptor() { - public Object invoke(MethodInvocation invocation) throws Throwable { - buffer.append(2); - Object retval = invocation.proceed(); - buffer.append(4); - return retval; - } - }); - taskAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append(3); - } - }); - endpoint.setTaskAdviceChain(taskAdviceChain); - endpoint.afterPropertiesSet(); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - assertEquals("1234", buffer.toString()); - } - - @Test - public void testDispatchAdviceChain() { - TestSource source = new TestSource("testing", 2); - QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - final StringBuffer buffer = new StringBuffer(); - List dispatchAdviceChain = new ArrayList(); - dispatchAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append("a"); - } - }); - dispatchAdviceChain.add(new MethodInterceptor() { - public Object invoke(MethodInvocation invocation) throws Throwable { - buffer.append("b"); - Object retval = invocation.proceed(); - buffer.append("d"); - return retval; - } - }); - dispatchAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append("c"); - } - }); - endpoint.setDispatchAdviceChain(dispatchAdviceChain); - endpoint.afterPropertiesSet(); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - assertEquals("abcdabcd", buffer.toString()); - } - - @Test - public void testTaskAndDispatchAdviceChains() { - TestSource source = new TestSource("testing", 3); - QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - List dispatchAdviceChain = new ArrayList(); - List taskAdviceChain = new ArrayList(); - final StringBuffer buffer = new StringBuffer(); - dispatchAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append("a"); - } - }); - dispatchAdviceChain.add(new MethodInterceptor() { - public Object invoke(MethodInvocation invocation) throws Throwable { - buffer.append("b"); - Object retval = invocation.proceed(); - buffer.append("c"); - return retval; - } - }); - taskAdviceChain.add(new MethodInterceptor() { - public Object invoke(MethodInvocation invocation) throws Throwable { - buffer.append(1); - Object retval = invocation.proceed(); - buffer.append(3); - return retval; - } - }); - taskAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append(2); - } - }); - endpoint.setTaskAdviceChain(taskAdviceChain); - endpoint.setDispatchAdviceChain(dispatchAdviceChain); - endpoint.afterPropertiesSet(); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - assertEquals("12abcabcabc3", buffer.toString()); - } - - @Test - public void testRefreshTaskAtRuntime() { - TestSource source = new TestSource("testing", 3); - QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - List dispatchAdviceChain = new ArrayList(); - List taskAdviceChain = new ArrayList(); - final StringBuffer buffer = new StringBuffer(); - dispatchAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append("a"); - } - }); - dispatchAdviceChain.add(new MethodInterceptor() { - public Object invoke(MethodInvocation invocation) throws Throwable { - buffer.append("b"); - Object retval = invocation.proceed(); - buffer.append("c"); - return retval; - } - }); - taskAdviceChain.add(new MethodInterceptor() { - public Object invoke(MethodInvocation invocation) throws Throwable { - buffer.append(1); - Object retval = invocation.proceed(); - buffer.append(3); - return retval; - } - }); - taskAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append(2); - } - }); - endpoint.setDispatchAdviceChain(dispatchAdviceChain); - endpoint.afterPropertiesSet(); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - assertEquals("abcabcabc", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setTaskAdviceChain(taskAdviceChain); - endpoint.refreshTask(); - endpoint.run(); - assertEquals("12abcabcabc3", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setDispatchAdviceChain(null); - endpoint.refreshTask(); - endpoint.run(); - assertEquals("123", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setTaskAdviceChain(null); - endpoint.setDispatchAdviceChain(dispatchAdviceChain); - endpoint.refreshTask(); - endpoint.run(); - assertEquals("abcabcabc", buffer.toString()); - } - - @Test - public void testInitializeTaskDoesNotRefreshWithDispatchAdviceOnly() { - TestSource source = new TestSource("testing", 3); - QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - List dispatchAdviceChain = new ArrayList(); - List taskAdviceChain = new ArrayList(); - final StringBuffer buffer = new StringBuffer(); - dispatchAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append("a"); - } - }); - taskAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append(1); - } - }); - endpoint.setDispatchAdviceChain(dispatchAdviceChain); - endpoint.afterPropertiesSet(); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - assertEquals("aaa", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setTaskAdviceChain(taskAdviceChain); - endpoint.initializeTask(); - endpoint.run(); - assertEquals("aaa", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setDispatchAdviceChain(null); - endpoint.initializeTask(); - endpoint.run(); - assertEquals("aaa", buffer.toString()); - } - - @Test - public void testInitializeTaskDoesNotRefreshWithTaskAdviceOnly() { - TestSource source = new TestSource("testing", 3); - QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - List dispatchAdviceChain = new ArrayList(); - List taskAdviceChain = new ArrayList(); - final StringBuffer buffer = new StringBuffer(); - dispatchAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append("a"); - } - }); - taskAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append(1); - } - }); - endpoint.setTaskAdviceChain(taskAdviceChain); - endpoint.afterPropertiesSet(); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - assertEquals("1", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setDispatchAdviceChain(dispatchAdviceChain); - endpoint.initializeTask(); - endpoint.run(); - assertEquals("1", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setTaskAdviceChain(null); - endpoint.initializeTask(); - endpoint.run(); - assertEquals("1", buffer.toString()); - } - - @Test - public void testInitializeTaskDoesNotRefreshWithTaskAndDispatchAdvice() { - TestSource source = new TestSource("testing", 3); - QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - List dispatchAdviceChain = new ArrayList(); - List taskAdviceChain = new ArrayList(); - final StringBuffer buffer = new StringBuffer(); - dispatchAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append("a"); - } - }); - taskAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append(1); - } - }); - endpoint.setTaskAdviceChain(taskAdviceChain); - endpoint.setDispatchAdviceChain(dispatchAdviceChain); - endpoint.afterPropertiesSet(); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - assertEquals("1aaa", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setDispatchAdviceChain(null); - endpoint.initializeTask(); - endpoint.run(); - assertEquals("1aaa", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setTaskAdviceChain(null); - endpoint.initializeTask(); - endpoint.run(); - assertEquals("1aaa", buffer.toString()); - } - - @Test - public void testInitializeTaskDoesNotRefreshWithNoAdvice() { - TestSource source = new TestSource("testing", 3); - QueueChannel channel = new QueueChannel(); - PollingSchedule schedule = new PollingSchedule(1000); - schedule.setInitialDelay(10000); - SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule); - List dispatchAdviceChain = new ArrayList(); - List taskAdviceChain = new ArrayList(); - final StringBuffer buffer = new StringBuffer(); - dispatchAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append("a"); - } - }); - taskAdviceChain.add(new MethodBeforeAdvice() { - public void before(Method method, Object[] args, Object target) throws Throwable { - buffer.append(1); - } - }); - endpoint.afterPropertiesSet(); - endpoint.setMaxMessagesPerTask(5); - endpoint.run(); - assertEquals("", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setDispatchAdviceChain(dispatchAdviceChain); - endpoint.initializeTask(); - endpoint.run(); - assertEquals("", buffer.toString()); - buffer.delete(0, buffer.length()); - source.resetCounter(); - endpoint.setTaskAdviceChain(taskAdviceChain); - endpoint.initializeTask(); - endpoint.run(); - assertEquals("", buffer.toString()); - } - private static class TestSource implements Source { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/adapterTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/handler/adapterTests.xml index 26d98c8001..21b62427fd 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/adapterTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/adapterTests.xml @@ -20,11 +20,11 @@ - + - +