Removed the aroundHandle() method from EndpointInterceptor now that the endpoints are simplified ("normal" AOP interception can easily be applied to the invoked target object). Removed the @Concurrency annotation since it was being applied by the endpointInterceptor instead of the poller and since the thread-pool executor is only one of the available taskExecutor options (may provide a 'taskExecutor' reference for @Poller instead). Removed the ConcurrencyInterceptor and TransactionInterceptor implementations.
This commit is contained in:
@@ -25,22 +25,18 @@ import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.aop.framework.ProxyFactory;
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.context.support.AbstractApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.integration.ConfigurationException;
|
||||
import org.springframework.integration.annotation.ChannelAdapter;
|
||||
import org.springframework.integration.annotation.Concurrency;
|
||||
import org.springframework.integration.annotation.Handler;
|
||||
import org.springframework.integration.annotation.MessageEndpoint;
|
||||
import org.springframework.integration.annotation.MessageTarget;
|
||||
@@ -56,14 +52,12 @@ import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.endpoint.EndpointInterceptor;
|
||||
import org.springframework.integration.endpoint.DefaultEndpoint;
|
||||
import org.springframework.integration.handler.MessageHandler;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.integration.scheduling.PollingSchedule;
|
||||
import org.springframework.integration.scheduling.Schedule;
|
||||
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
@@ -165,28 +159,6 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
messageBus.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConcurrencyAnnotationWithValues() {
|
||||
MessageBus messageBus = new DefaultMessageBus();
|
||||
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
|
||||
postProcessor.afterPropertiesSet();
|
||||
ConcurrencyAnnotationTestBean testBean = new ConcurrencyAnnotationTestBean();
|
||||
postProcessor.postProcessAfterInitialization(testBean, "testBean");
|
||||
DefaultEndpoint<?> endpoint = (DefaultEndpoint<?>) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint");
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
|
||||
List<EndpointInterceptor> interceptors = (List<EndpointInterceptor>) accessor.getPropertyValue("interceptors");
|
||||
assertEquals(1, interceptors.size());
|
||||
EndpointInterceptor interceptor = interceptors.get(0);
|
||||
accessor = new DirectFieldAccessor(interceptor);
|
||||
ConcurrentTaskExecutor cte = (ConcurrentTaskExecutor) accessor.getPropertyValue("executor");
|
||||
ThreadPoolExecutor executor = (ThreadPoolExecutor) cte.getConcurrentExecutor();
|
||||
assertEquals(17, executor.getCorePoolSize());
|
||||
assertEquals(42, executor.getMaximumPoolSize());
|
||||
assertEquals(123, executor.getKeepAliveTime(TimeUnit.SECONDS));
|
||||
assertEquals(11, executor.getQueue().remainingCapacity());
|
||||
}
|
||||
|
||||
@Test(expected=IllegalArgumentException.class)
|
||||
public void testPostProcessorWithNullMessageBus() {
|
||||
new MessagingAnnotationPostProcessor(null);
|
||||
@@ -450,17 +422,6 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint(input="inputChannel")
|
||||
@Concurrency(coreSize=17, maxSize=42, keepAliveSeconds=123, queueCapacity=11)
|
||||
private static class ConcurrencyAnnotationTestBean {
|
||||
|
||||
@Handler
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint(input="inputChannel")
|
||||
private static class ChannelRegistryAwareTestBean implements ChannelRegistryAware {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user