diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index a20efe7631..350ef622f0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -26,8 +26,12 @@ import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.channel.BeanFactoryChannelResolver; +import org.springframework.integration.channel.MessagePublishingErrorHandler; +import org.springframework.integration.executor.ErrorHandlingTaskExecutor; import org.springframework.integration.scheduling.IntervalTrigger; import org.springframework.integration.scheduling.Trigger; +import org.springframework.integration.util.ErrorHandler; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; @@ -66,6 +70,8 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement private volatile Runnable poller; private volatile boolean initialized; + + private volatile ErrorHandler errorHandler; private final Object initializationMonitor = new Object(); @@ -90,6 +96,10 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } + + public void setErrorHandler(ErrorHandler errorHandler){ + this.errorHandler = errorHandler; + } /** * Specify a transaction manager to use for all polling operations. @@ -138,6 +148,14 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement this.transactionManager, this.transactionDefinition); } this.poller = this.createPoller(); + if(this.taskExecutor != null){ + if(this.errorHandler == null){ + taskExecutor = new ErrorHandlingTaskExecutor( + new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(getBeanFactory())),taskExecutor); + } else { + taskExecutor = new ErrorHandlingTaskExecutor(errorHandler, taskExecutor); + } + } this.initialized = true; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/executor/ErrorHandlingTaskExecutor.java b/org.springframework.integration/src/main/java/org/springframework/integration/executor/ErrorHandlingTaskExecutor.java new file mode 100644 index 0000000000..55e99f1fc3 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/executor/ErrorHandlingTaskExecutor.java @@ -0,0 +1,64 @@ +/* + * Copyright 2002-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.executor; + +import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.util.ErrorHandler; + +public class ErrorHandlingTaskExecutor implements TaskExecutor { + + + private final ErrorHandler errorHandler; + + private final TaskExecutor taskExecutor; + + /** + * @param errorChannel + */ + public ErrorHandlingTaskExecutor(ErrorHandler errorHandler, TaskExecutor taskExecutor) { + this.errorHandler = errorHandler; + this.taskExecutor = taskExecutor; + } + + public void execute(Runnable task) { + taskExecutor.execute(new ErrorHandlingRunnableWrapper(task,errorHandler)); + } + + private static class ErrorHandlingRunnableWrapper implements Runnable { + + private final ErrorHandler errorHandler; + + private final Runnable runnableTarget; + + public ErrorHandlingRunnableWrapper(Runnable runnableTarget,ErrorHandler errorHandler) { + this.runnableTarget = runnableTarget; + this.errorHandler = errorHandler; + } + + public void run() { + try { + runnableTarget.run(); + } + catch (Throwable t) { + errorHandler.handle(t); + } + } + + } + + + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/PollingEndpointErrorHandlingTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/PollingEndpointErrorHandlingTests.java new file mode 100644 index 0000000000..39991a585f --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/PollingEndpointErrorHandlingTests.java @@ -0,0 +1,44 @@ +/* + * Copyright 2002-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.endpoint; + +import static org.junit.Assert.*; + +import org.junit.Test; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.core.Message; +import org.springframework.integration.message.ErrorMessage; + +/** + * + * @author Jonas Partner + * + */ +public class PollingEndpointErrorHandlingTests { + + @SuppressWarnings("unchecked") + @Test + public void checkExcpetionPlacedOnErrorChannel() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "pollingEndpointErrorHandlingTests.xml", this.getClass()); + PollableChannel errorChannel = (PollableChannel) context.getBean("errorChannel"); + Message errorMessage = errorChannel.receive(5000); + assertNotNull("No error message received", errorMessage); + assertEquals("Message recevied was not an ErrorMessage" ,ErrorMessage.class,errorMessage.getClass()); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/PollingEndpointStub.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/PollingEndpointStub.java new file mode 100644 index 0000000000..63ca48dcb6 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/PollingEndpointStub.java @@ -0,0 +1,25 @@ +/* + * Copyright 2002-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.endpoint; + +public class PollingEndpointStub extends AbstractPollingEndpoint { + + @Override + protected boolean doPoll() { + throw new RuntimeException("Poll failed"); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/pollingEndpointErrorHandlingTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/pollingEndpointErrorHandlingTests.xml new file mode 100644 index 0000000000..ed0ffe8f44 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/pollingEndpointErrorHandlingTests.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + + + +