PublishSubscribeChannel now wraps a provided TaskExecutor with the ErrorHandlingTaskExecutor if necessary (INT-440). Also, added namespace support for the publish-subscribe-channel's 'error-handler' reference (INT-483).
This commit is contained in:
@@ -18,6 +18,7 @@ package org.springframework.integration.channel.config;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.Test;
|
||||
@@ -26,6 +27,7 @@ import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.beans.FatalBeanException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
@@ -33,6 +35,7 @@ import org.springframework.integration.config.TestChannelInterceptor;
|
||||
import org.springframework.integration.core.Message;
|
||||
import org.springframework.integration.core.MessageChannel;
|
||||
import org.springframework.integration.core.MessagePriority;
|
||||
import org.springframework.integration.executor.ErrorHandlingTaskExecutor;
|
||||
import org.springframework.integration.message.GenericMessage;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.MessageDeliveryException;
|
||||
@@ -85,8 +88,12 @@ public class ChannelParserTests {
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
|
||||
accessor = new DirectFieldAccessor(accessor.getPropertyValue("dispatcher"));
|
||||
Object taskExecutorProperty = accessor.getPropertyValue("taskExecutor");
|
||||
assertNotNull(taskExecutorProperty);
|
||||
assertEquals(ErrorHandlingTaskExecutor.class, taskExecutorProperty.getClass());
|
||||
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(taskExecutorProperty);
|
||||
TaskExecutor innerExecutor = (TaskExecutor) executorAccessor.getPropertyValue("taskExecutor");
|
||||
Object taskExecutorBean = context.getBean("taskExecutor");
|
||||
assertEquals(taskExecutorBean, taskExecutorProperty);
|
||||
assertEquals(taskExecutorBean, innerExecutor);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -29,6 +29,8 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
|
||||
import org.springframework.integration.executor.ErrorHandlingTaskExecutor;
|
||||
import org.springframework.integration.util.ErrorHandler;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
@@ -73,7 +75,22 @@ public class PublishSubscribeChannelParserTests {
|
||||
DirectFieldAccessor dispatcherAccessor = new DirectFieldAccessor(dispatcher);
|
||||
TaskExecutor executor = (TaskExecutor) dispatcherAccessor.getPropertyValue("taskExecutor");
|
||||
assertNotNull(executor);
|
||||
assertEquals(context.getBean("pool"), executor);
|
||||
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
|
||||
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
|
||||
TaskExecutor innerExecutor = (TaskExecutor) executorAccessor.getPropertyValue("taskExecutor");
|
||||
assertEquals(context.getBean("pool"), innerExecutor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelWithErrorHandler() {
|
||||
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
"publishSubscribeChannelParserTests.xml", this.getClass());
|
||||
PublishSubscribeChannel channel = (PublishSubscribeChannel)
|
||||
context.getBean("channelWithErrorHandler");
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
|
||||
ErrorHandler errorHandler = (ErrorHandler) accessor.getPropertyValue("errorHandler");
|
||||
assertNotNull(errorHandler);
|
||||
assertEquals(context.getBean("testErrorHandler"), errorHandler);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -13,6 +13,10 @@
|
||||
|
||||
<publish-subscribe-channel id="channelWithTaskExecutor" task-executor="pool"/>
|
||||
|
||||
<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler"/>
|
||||
|
||||
<thread-pool-task-executor id="pool"/>
|
||||
|
||||
<beans:bean id="testErrorHandler" class="org.springframework.integration.config.TestErrorHandler"/>
|
||||
|
||||
</beans:beans>
|
||||
|
||||
Reference in New Issue
Block a user