Added support for interceptors on MessageEndpoints.

This commit is contained in:
Mark Fisher
2008-06-30 22:26:04 +00:00
parent f9ad394850
commit 07b2179baa
35 changed files with 836 additions and 908 deletions

View File

@@ -172,7 +172,8 @@ public class MessageBusTests {
public void testErrorChannelWithFailedDispatch() throws InterruptedException {
MessageBus bus = new MessageBus();
CountDownLatch latch = new CountDownLatch(1);
SourceEndpoint sourceEndpoint = new SourceEndpoint(new FailingSource(latch), new QueueChannel(), new PollingSchedule(1000));
SourceEndpoint sourceEndpoint = new SourceEndpoint(new FailingSource(latch), new QueueChannel());
sourceEndpoint.setSchedule(new PollingSchedule(1000));
bus.registerEndpoint("testEndpoint", sourceEndpoint);
bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);

View File

@@ -0,0 +1,113 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.aop.support.AopUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.CommandMessage;
import org.springframework.integration.message.PollCommand;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class MessageEndpointBeanPostProcessorTests {
@Test
public void testNoProxyCreatedForHandlerEndpointWithEmptyAdviceChain() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageEndpointBeanPostProcessorTests.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithoutAdvice");
assertFalse(AopUtils.isAopProxy(endpoint));
}
@Test
public void testHandlerEndpointWithAdviceChain() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"messageEndpointBeanPostProcessorTests.xml", this.getClass());
context.start();
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithAdvice");
assertTrue(AopUtils.isAopProxy(endpoint));
TestBeforeAdvice beforeAdvice = (TestBeforeAdvice) context.getBean("simpleAdvice");
TestEndpointInterceptor interceptor = (TestEndpointInterceptor) context.getBean("interceptor");
assertEquals(0, beforeAdvice.getCount());
assertEquals(0, interceptor.getCount());
endpoint.invoke(new StringMessage("test"));
assertEquals(1, beforeAdvice.getCount());
assertEquals(2, interceptor.getCount());
context.stop();
}
@Test
public void testNoProxyCreatedForTargetEndpointWithEmptyAdviceChain() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageEndpointBeanPostProcessorTests.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithoutAdvice");
assertFalse(AopUtils.isAopProxy(endpoint));
}
@Test
public void testTargetEndpointWithAdviceChain() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"messageEndpointBeanPostProcessorTests.xml", this.getClass());
context.start();
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithAdvice");
assertTrue(AopUtils.isAopProxy(endpoint));
TestBeforeAdvice beforeAdvice = (TestBeforeAdvice) context.getBean("simpleAdvice");
TestEndpointInterceptor interceptor = (TestEndpointInterceptor) context.getBean("interceptor");
assertEquals(0, beforeAdvice.getCount());
assertEquals(0, interceptor.getCount());
endpoint.invoke(new StringMessage("test"));
assertEquals(1, beforeAdvice.getCount());
assertEquals(2, interceptor.getCount());
context.stop();
}
@Test
public void testNoProxyCreatedForSourceEndpointWithEmptyAdviceChain() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageEndpointBeanPostProcessorTests.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithoutAdvice");
assertFalse(AopUtils.isAopProxy(endpoint));
}
@Test
public void testSourceEndpointWithAdviceChain() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"messageEndpointBeanPostProcessorTests.xml", this.getClass());
context.start();
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithAdvice");
assertTrue(AopUtils.isAopProxy(endpoint));
TestBeforeAdvice beforeAdvice = (TestBeforeAdvice) context.getBean("simpleAdvice");
TestEndpointInterceptor interceptor = (TestEndpointInterceptor) context.getBean("interceptor");
assertEquals(0, beforeAdvice.getCount());
assertEquals(0, interceptor.getCount());
endpoint.invoke(new CommandMessage(new PollCommand()));
assertEquals(1, beforeAdvice.getCount());
assertEquals(2, interceptor.getCount());
context.stop();
}
}

View File

@@ -0,0 +1,40 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.config;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.aop.MethodBeforeAdvice;
/**
* @author Mark Fisher
*/
public class TestBeforeAdvice implements MethodBeforeAdvice {
private AtomicInteger counter = new AtomicInteger();
public int getCount() {
return this.counter.get();
}
public void before(Method method, Object[] args, Object target) throws Throwable {
this.counter.incrementAndGet();
}
}

View File

@@ -0,0 +1,44 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.config;
import java.util.concurrent.atomic.AtomicInteger;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.integration.endpoint.EndpointInterceptorAdapter;
/**
* @author Mark Fisher
*/
public class TestEndpointInterceptor extends EndpointInterceptorAdapter {
private AtomicInteger counter = new AtomicInteger();
public int getCount() {
return this.counter.get();
}
public boolean aroundInvoke(MethodInvocation invocation) throws Throwable {
this.counter.incrementAndGet();
Boolean result = (Boolean) invocation.proceed();
this.counter.incrementAndGet();
return result;
}
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.config;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.Source;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class TestSource implements Source {
public Message<?> receive() {
return new StringMessage("test");
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.config;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.Target;
/**
* @author Mark Fisher
*/
public class TestTarget implements Target {
public boolean send(Message<?> message) {
return true;
}
}

View File

@@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus/>
<channel id="testChannel"/>
<channel id="replyChannel"/>
<handler-endpoint id="handlerEndpointWithoutAdvice"
input-channel="testChannel"
handler="testHandler"
output-channel="replyChannel">
<schedule period="100"/>
</handler-endpoint>
<handler-endpoint id="handlerEndpointWithAdvice"
input-channel="testChannel"
handler="testHandler"
output-channel="replyChannel">
<schedule period="100"/>
<advice-chain>
<ref bean="simpleAdvice"/>
<ref bean="interceptor"/>
</advice-chain>
</handler-endpoint>
<target-endpoint id="targetEndpointWithoutAdvice"
input-channel="testChannel"
target="testTarget">
<schedule period="100"/>
</target-endpoint>
<target-endpoint id="targetEndpointWithAdvice"
input-channel="testChannel"
target="testTarget">
<schedule period="100"/>
<advice-chain>
<ref bean="simpleAdvice"/>
<ref bean="interceptor"/>
</advice-chain>
</target-endpoint>
<source-endpoint id="sourceEndpointWithoutAdvice"
source="testSource"
channel="replyChannel">
<schedule period="100"/>
</source-endpoint>
<source-endpoint id="sourceEndpointWithAdvice"
source="testSource"
channel="testChannel">
<schedule period="100"/>
<advice-chain>
<ref bean="simpleAdvice"/>
<ref bean="interceptor"/>
</advice-chain>
</source-endpoint>
<beans:bean id="testHandler" class="org.springframework.integration.config.TestHandler"/>
<beans:bean id="testSource" class="org.springframework.integration.config.TestSource"/>
<beans:bean id="testTarget" class="org.springframework.integration.config.TestTarget"/>
<beans:bean id="simpleAdvice" class="org.springframework.integration.config.TestBeforeAdvice"/>
<beans:bean id="interceptor" class="org.springframework.integration.config.TestEndpointInterceptor"/>
</beans:beans>

View File

@@ -1,101 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dispatcher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.Iterator;
import org.junit.Test;
import org.springframework.integration.channel.DispatcherPolicy;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class DefaultPollingDispatcherTests {
@Test
public void testSingleMessagePerRetrieval() {
DispatcherPolicy dispatcherPolicy = new DispatcherPolicy();
dispatcherPolicy.setReceiveTimeout(0);
MessageChannel channel = new QueueChannel(5, dispatcherPolicy);
DefaultPollingDispatcher dispatcher = new DefaultPollingDispatcher(channel);
Collection<Message<?>> results = dispatcher.poll();
assertTrue(results.isEmpty());
channel.send(new StringMessage("test1"), 0);
channel.send(new StringMessage("test2"), 0);
results = dispatcher.poll();
assertEquals(1, results.size());
assertEquals("test1", results.iterator().next().getPayload());
results = dispatcher.poll();
assertEquals(1, results.size());
assertEquals("test2", results.iterator().next().getPayload());
}
@Test
public void testMultipleMessagesPerRetrieval() {
DispatcherPolicy dispatcherPolicy = new DispatcherPolicy();
dispatcherPolicy.setReceiveTimeout(0);
dispatcherPolicy.setMaxMessagesPerTask(2);
MessageChannel channel = new QueueChannel(5, dispatcherPolicy);
DefaultPollingDispatcher dispatcher = new DefaultPollingDispatcher(channel);
Collection<Message<?>> results = dispatcher.poll();
assertTrue(results.isEmpty());
channel.send(new StringMessage("test1"), 0);
channel.send(new StringMessage("test2"), 0);
channel.send(new StringMessage("test3"), 0);
results = dispatcher.poll();
assertEquals(2, results.size());
Iterator<Message<?>> iter = results.iterator();
assertEquals("test1", iter.next().getPayload());
assertEquals("test2", iter.next().getPayload());
results = dispatcher.poll();
assertEquals(1, results.size());
assertEquals("test3", results.iterator().next().getPayload());
}
@Test
public void testMaxMessagesConfiguredDynamically() {
DispatcherPolicy dispatcherPolicy = new DispatcherPolicy();
dispatcherPolicy.setReceiveTimeout(0);
dispatcherPolicy.setMaxMessagesPerTask(1);
MessageChannel channel = new QueueChannel(5, dispatcherPolicy);
DefaultPollingDispatcher dispatcher = new DefaultPollingDispatcher(channel);
Collection<Message<?>> results = dispatcher.poll();
assertTrue(results.isEmpty());
channel.send(new StringMessage("test1"), 0);
channel.send(new StringMessage("test2"), 0);
channel.send(new StringMessage("test3"), 0);
results = dispatcher.poll();
assertEquals(1, results.size());
assertEquals("test1", results.iterator().next().getPayload());
dispatcherPolicy.setMaxMessagesPerTask(5);
results = dispatcher.poll();
assertEquals(2, results.size());
Iterator<Message<?>> iter = results.iterator();
assertEquals("test2", iter.next().getPayload());
assertEquals("test3", iter.next().getPayload());
}
}

View File

@@ -18,25 +18,17 @@ package org.springframework.integration.endpoint;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.aopalliance.aop.Advice;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.junit.Test;
import org.springframework.aop.MethodBeforeAdvice;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.CommandMessage;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.PollCommand;
import org.springframework.integration.message.Source;
import org.springframework.integration.scheduling.PollingSchedule;
/**
* @author Mark Fisher
@@ -47,387 +39,13 @@ public class SourceEndpointTests {
public void testPolledSourceSendsToChannel() {
TestSource source = new TestSource("testing", 1);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(100);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
endpoint.run();
SourceEndpoint endpoint = new SourceEndpoint(source, channel);
endpoint.invoke(new CommandMessage(new PollCommand()));
Message<?> message = channel.receive(1000);
assertNotNull("message should not be null", message);
assertEquals("testing.1", message.getPayload());
}
@Test
public void testSendTimeout() {
TestSource source = new TestSource("testing", 1);
QueueChannel channel = new QueueChannel(1);
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
endpoint.setSendTimeout(10);
endpoint.run();
Message<?> message1 = channel.receive(1000);
assertNotNull("message should not be null", message1);
assertEquals("testing.1", message1.getPayload());
Message<?> message2 = channel.receive(0);
assertNull("second message should be null", message2);
source.resetCounter();
endpoint.run();
Message<?> message3 = channel.receive(100);
assertNotNull("third message should not be null", message3);
assertEquals("testing.1", message3.getPayload());
}
@Test
public void testMultipleMessagesPerPoll() {
TestSource source = new TestSource("testing", 3);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
endpoint.setMaxMessagesPerTask(5);
endpoint.run();
Message<?> message1 = channel.receive(0);
assertNotNull("message should not be null", message1);
assertEquals("testing.1", message1.getPayload());
Message<?> message2 = channel.receive(0);
assertNotNull("message should not be null", message2);
assertEquals("testing.2", message2.getPayload());
Message<?> message3 = channel.receive(0);
assertNotNull("message should not be null", message3);
assertEquals("testing.3", message3.getPayload());
Message<?> message4 = channel.receive(0);
assertNull("message should be null", message4);
}
@Test
public void testTaskAdviceChain() {
TestSource source = new TestSource("testing", 3);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
final StringBuffer buffer = new StringBuffer();
List<Advice> taskAdviceChain = new ArrayList<Advice>();
taskAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append(1);
}
});
taskAdviceChain.add(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
buffer.append(2);
Object retval = invocation.proceed();
buffer.append(4);
return retval;
}
});
taskAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append(3);
}
});
endpoint.setTaskAdviceChain(taskAdviceChain);
endpoint.afterPropertiesSet();
endpoint.setMaxMessagesPerTask(5);
endpoint.run();
assertEquals("1234", buffer.toString());
}
@Test
public void testDispatchAdviceChain() {
TestSource source = new TestSource("testing", 2);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
final StringBuffer buffer = new StringBuffer();
List<Advice> dispatchAdviceChain = new ArrayList<Advice>();
dispatchAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append("a");
}
});
dispatchAdviceChain.add(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
buffer.append("b");
Object retval = invocation.proceed();
buffer.append("d");
return retval;
}
});
dispatchAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append("c");
}
});
endpoint.setDispatchAdviceChain(dispatchAdviceChain);
endpoint.afterPropertiesSet();
endpoint.setMaxMessagesPerTask(5);
endpoint.run();
assertEquals("abcdabcd", buffer.toString());
}
@Test
public void testTaskAndDispatchAdviceChains() {
TestSource source = new TestSource("testing", 3);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
List<Advice> dispatchAdviceChain = new ArrayList<Advice>();
List<Advice> taskAdviceChain = new ArrayList<Advice>();
final StringBuffer buffer = new StringBuffer();
dispatchAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append("a");
}
});
dispatchAdviceChain.add(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
buffer.append("b");
Object retval = invocation.proceed();
buffer.append("c");
return retval;
}
});
taskAdviceChain.add(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
buffer.append(1);
Object retval = invocation.proceed();
buffer.append(3);
return retval;
}
});
taskAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append(2);
}
});
endpoint.setTaskAdviceChain(taskAdviceChain);
endpoint.setDispatchAdviceChain(dispatchAdviceChain);
endpoint.afterPropertiesSet();
endpoint.setMaxMessagesPerTask(5);
endpoint.run();
assertEquals("12abcabcabc3", buffer.toString());
}
@Test
public void testRefreshTaskAtRuntime() {
TestSource source = new TestSource("testing", 3);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
List<Advice> dispatchAdviceChain = new ArrayList<Advice>();
List<Advice> taskAdviceChain = new ArrayList<Advice>();
final StringBuffer buffer = new StringBuffer();
dispatchAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append("a");
}
});
dispatchAdviceChain.add(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
buffer.append("b");
Object retval = invocation.proceed();
buffer.append("c");
return retval;
}
});
taskAdviceChain.add(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
buffer.append(1);
Object retval = invocation.proceed();
buffer.append(3);
return retval;
}
});
taskAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append(2);
}
});
endpoint.setDispatchAdviceChain(dispatchAdviceChain);
endpoint.afterPropertiesSet();
endpoint.setMaxMessagesPerTask(5);
endpoint.run();
assertEquals("abcabcabc", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setTaskAdviceChain(taskAdviceChain);
endpoint.refreshTask();
endpoint.run();
assertEquals("12abcabcabc3", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setDispatchAdviceChain(null);
endpoint.refreshTask();
endpoint.run();
assertEquals("123", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setTaskAdviceChain(null);
endpoint.setDispatchAdviceChain(dispatchAdviceChain);
endpoint.refreshTask();
endpoint.run();
assertEquals("abcabcabc", buffer.toString());
}
@Test
public void testInitializeTaskDoesNotRefreshWithDispatchAdviceOnly() {
TestSource source = new TestSource("testing", 3);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
List<Advice> dispatchAdviceChain = new ArrayList<Advice>();
List<Advice> taskAdviceChain = new ArrayList<Advice>();
final StringBuffer buffer = new StringBuffer();
dispatchAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append("a");
}
});
taskAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append(1);
}
});
endpoint.setDispatchAdviceChain(dispatchAdviceChain);
endpoint.afterPropertiesSet();
endpoint.setMaxMessagesPerTask(5);
endpoint.run();
assertEquals("aaa", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setTaskAdviceChain(taskAdviceChain);
endpoint.initializeTask();
endpoint.run();
assertEquals("aaa", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setDispatchAdviceChain(null);
endpoint.initializeTask();
endpoint.run();
assertEquals("aaa", buffer.toString());
}
@Test
public void testInitializeTaskDoesNotRefreshWithTaskAdviceOnly() {
TestSource source = new TestSource("testing", 3);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
List<Advice> dispatchAdviceChain = new ArrayList<Advice>();
List<Advice> taskAdviceChain = new ArrayList<Advice>();
final StringBuffer buffer = new StringBuffer();
dispatchAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append("a");
}
});
taskAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append(1);
}
});
endpoint.setTaskAdviceChain(taskAdviceChain);
endpoint.afterPropertiesSet();
endpoint.setMaxMessagesPerTask(5);
endpoint.run();
assertEquals("1", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setDispatchAdviceChain(dispatchAdviceChain);
endpoint.initializeTask();
endpoint.run();
assertEquals("1", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setTaskAdviceChain(null);
endpoint.initializeTask();
endpoint.run();
assertEquals("1", buffer.toString());
}
@Test
public void testInitializeTaskDoesNotRefreshWithTaskAndDispatchAdvice() {
TestSource source = new TestSource("testing", 3);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
List<Advice> dispatchAdviceChain = new ArrayList<Advice>();
List<Advice> taskAdviceChain = new ArrayList<Advice>();
final StringBuffer buffer = new StringBuffer();
dispatchAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append("a");
}
});
taskAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append(1);
}
});
endpoint.setTaskAdviceChain(taskAdviceChain);
endpoint.setDispatchAdviceChain(dispatchAdviceChain);
endpoint.afterPropertiesSet();
endpoint.setMaxMessagesPerTask(5);
endpoint.run();
assertEquals("1aaa", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setDispatchAdviceChain(null);
endpoint.initializeTask();
endpoint.run();
assertEquals("1aaa", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setTaskAdviceChain(null);
endpoint.initializeTask();
endpoint.run();
assertEquals("1aaa", buffer.toString());
}
@Test
public void testInitializeTaskDoesNotRefreshWithNoAdvice() {
TestSource source = new TestSource("testing", 3);
QueueChannel channel = new QueueChannel();
PollingSchedule schedule = new PollingSchedule(1000);
schedule.setInitialDelay(10000);
SourceEndpoint endpoint = new SourceEndpoint(source, channel, schedule);
List<Advice> dispatchAdviceChain = new ArrayList<Advice>();
List<Advice> taskAdviceChain = new ArrayList<Advice>();
final StringBuffer buffer = new StringBuffer();
dispatchAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append("a");
}
});
taskAdviceChain.add(new MethodBeforeAdvice() {
public void before(Method method, Object[] args, Object target) throws Throwable {
buffer.append(1);
}
});
endpoint.afterPropertiesSet();
endpoint.setMaxMessagesPerTask(5);
endpoint.run();
assertEquals("", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setDispatchAdviceChain(dispatchAdviceChain);
endpoint.initializeTask();
endpoint.run();
assertEquals("", buffer.toString());
buffer.delete(0, buffer.length());
source.resetCounter();
endpoint.setTaskAdviceChain(taskAdviceChain);
endpoint.initializeTask();
endpoint.run();
assertEquals("", buffer.toString());
}
private static class TestSource implements Source<String> {

View File

@@ -20,11 +20,11 @@
</bean>
</constructor-arg>
<constructor-arg ref="channel"/>
<constructor-arg>
<property name="schedule">
<bean class="org.springframework.integration.scheduling.PollingSchedule">
<constructor-arg value="1000"/>
</bean>
</constructor-arg>
</property>
</bean>
<bean id="target" class="org.springframework.integration.handler.MethodInvokingTarget">