diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java index dbddc475b1..c402ec5d6a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import org.apache.commons.logging.Log; @@ -48,11 +47,11 @@ import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.endpoint.MessagingGateway; +import org.springframework.integration.scheduling.SimpleTaskScheduler; import org.springframework.integration.scheduling.TaskScheduler; import org.springframework.integration.scheduling.TaskSchedulerAware; -import org.springframework.integration.scheduling.spi.ProviderTaskScheduler; -import org.springframework.integration.scheduling.spi.SimpleScheduleServiceProvider; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.Assert; /** @@ -167,10 +166,12 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A } Assert.notNull(this.applicationContext, "ApplicationContext must not be null"); if (this.taskScheduler == null) { - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(DEFAULT_DISPATCHER_POOL_SIZE); + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(DEFAULT_DISPATCHER_POOL_SIZE); executor.setThreadFactory(new CustomizableThreadFactory("message-bus-")); executor.setRejectedExecutionHandler(new CallerRunsPolicy()); - this.taskScheduler = new ProviderTaskScheduler(new SimpleScheduleServiceProvider(executor)); + executor.afterPropertiesSet(); + this.taskScheduler = new SimpleTaskScheduler(executor); } if (this.getErrorChannel() == null) { this.registerChannel(new DefaultErrorChannel()); @@ -305,8 +306,8 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A for (Lifecycle gateway : this.lifecycleGateways) { gateway.start(); } - if (this.taskScheduler instanceof ProviderTaskScheduler) { - ((ProviderTaskScheduler) this.taskScheduler).setErrorHandler(new MessagePublishingErrorHandler(this.getErrorChannel())); + if (this.taskScheduler instanceof SimpleTaskScheduler) { + ((SimpleTaskScheduler) this.taskScheduler).setErrorHandler(new MessagePublishingErrorHandler(this.getErrorChannel())); } this.taskScheduler.start(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java index 3f10618b03..01f3d1d40c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java @@ -16,6 +16,8 @@ package org.springframework.integration.endpoint; +import java.util.concurrent.ScheduledFuture; + import org.springframework.context.Lifecycle; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.ConfigurationException; @@ -42,6 +44,8 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint private volatile ChannelPoller poller; + private volatile ScheduledFuture pollerFuture; + private volatile TaskExecutor taskExecutor; private volatile int maxMessagesPerPoll = -1; @@ -110,7 +114,7 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint if (this.getTaskScheduler() == null) { throw new ConfigurationException("failed to start endpoint, no taskScheduler available"); } - this.getTaskScheduler().schedule(poller); + this.pollerFuture = this.getTaskScheduler().schedule(this.poller, this.poller.getTrigger()); } this.running = true; } @@ -124,8 +128,8 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint if (this.inputChannel instanceof Subscribable) { ((Subscribable) inputChannel).unsubscribe(this); } - else if (this.poller != null) { - this.getTaskScheduler().cancel(poller, true); + else if (this.pollerFuture != null) { + this.pollerFuture.cancel(true); } this.running = false; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPoller.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPoller.java index 646e1645d7..ea75d35644 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPoller.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPoller.java @@ -18,8 +18,13 @@ package org.springframework.integration.endpoint; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.scheduling.CronSchedule; +import org.springframework.integration.scheduling.CronTrigger; +import org.springframework.integration.scheduling.IntervalTrigger; +import org.springframework.integration.scheduling.PollingSchedule; import org.springframework.integration.scheduling.SchedulableTask; import org.springframework.integration.scheduling.Schedule; +import org.springframework.integration.scheduling.Trigger; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; @@ -64,8 +69,18 @@ public abstract class AbstractPoller implements SchedulableTask, InitializingBea } - public Schedule getSchedule() { - return this.schedule; + public Trigger getTrigger() { + if (schedule instanceof PollingSchedule) { + PollingSchedule pollingSchedule = (PollingSchedule) schedule; + IntervalTrigger trigger = new IntervalTrigger(pollingSchedule.getPeriod(), pollingSchedule.getTimeUnit()); + trigger.setInitialDelay(pollingSchedule.getInitialDelay()); + trigger.setFixedRate(pollingSchedule.getFixedRate()); + return trigger; + } + if (schedule instanceof CronSchedule ) { + return new CronTrigger(((CronSchedule) schedule).getCronExpression()); + } + return null; } /** diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java index 112635365f..45c2c6b7c2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java @@ -16,6 +16,8 @@ package org.springframework.integration.endpoint; +import java.util.concurrent.ScheduledFuture; + import org.springframework.context.Lifecycle; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.message.MethodInvokingSource; @@ -38,6 +40,8 @@ public class SourcePollingChannelAdapter extends AbstractMessageProducingEndpoin private volatile SourcePoller poller; + private volatile ScheduledFuture pollerFuture; + private volatile int maxMessagesPerPoll = -1; private volatile boolean running; @@ -79,8 +83,9 @@ public class SourcePollingChannelAdapter extends AbstractMessageProducingEndpoin this.poller.setMaxMessagesPerPoll(maxMessagesPerPoll); TaskScheduler taskScheduler = this.getTaskScheduler(); if (taskScheduler != null) { - taskScheduler.schedule(this.poller); + this.pollerFuture = taskScheduler.schedule(this.poller, this.poller.getTrigger()); } + this.running = true; } } @@ -89,10 +94,10 @@ public class SourcePollingChannelAdapter extends AbstractMessageProducingEndpoin if (!this.running) { return; } - TaskScheduler taskScheduler = this.getTaskScheduler(); - if (taskScheduler != null) { - taskScheduler.cancel(this.poller, true); + if (this.pollerFuture != null) { + this.pollerFuture.cancel(true); } + this.running = false; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SchedulableTask.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SchedulableTask.java index 1e1d3038f8..bb2522f9b9 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SchedulableTask.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SchedulableTask.java @@ -23,6 +23,6 @@ package org.springframework.integration.scheduling; */ public interface SchedulableTask extends Runnable { - Schedule getSchedule(); + Trigger getTrigger(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java new file mode 100644 index 0000000000..94d3c3227e --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java @@ -0,0 +1,248 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.scheduling; + +import java.util.Collections; +import java.util.Date; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.util.ErrorHandler; +import org.springframework.scheduling.SchedulingException; +import org.springframework.util.Assert; + +/** + * An implementation of {@link TaskScheduler} that delegates to a {@link TaskExecutor}. + * + * @author Mark Fisher + * @author Marius Bogoevici + */ +public class SimpleTaskScheduler implements TaskScheduler { + + private final Log logger = LogFactory.getLog(this.getClass()); + + private final TaskExecutor executor; + + private volatile ErrorHandler errorHandler; + + private volatile SchedulerTask schedulerTask = new SchedulerTask(); + + private final DelayQueue> scheduledTasks = new DelayQueue>(); + + private final Set> executingTasks = Collections.synchronizedSet(new TreeSet>()); + + private volatile boolean running; + + private final ReentrantLock lifecycleLock = new ReentrantLock(); + + + public SimpleTaskScheduler(TaskExecutor executor) { + Assert.notNull(executor, "executor must not be null"); + this.executor = executor; + } + + + public void setErrorHandler(ErrorHandler errorHandler) { + this.errorHandler = errorHandler; + } + + public final ScheduledFuture schedule(Runnable task, Trigger trigger) { + TriggeredTask triggeredTask = new TriggeredTask(task, trigger); + return this.schedule(triggeredTask, null, null); + } + + private ScheduledFuture schedule(TriggeredTask triggeredTask, Date lastScheduledRunTime, Date lastCompleteTime) { + Date nextRunTime = triggeredTask.trigger.getNextRunTime(lastScheduledRunTime, lastCompleteTime); + if (nextRunTime != null) { + triggeredTask.setScheduledTime(nextRunTime); + this.scheduledTasks.offer(triggeredTask); + } + return triggeredTask; + } + + + // Lifecycle implementation + + public boolean isRunning() { + return this.running; + } + + public void start() { + this.lifecycleLock.lock(); + try { + if (this.running) { + return; + } + this.executor.execute(this.schedulerTask); + this.running = true; + } + finally { + this.lifecycleLock.unlock(); + } + } + + public void stop() { + this.lifecycleLock.lock(); + try { + if (!this.running) { + return; + } + Thread executingThread = this.schedulerTask.executingThread.getAndSet(null); + if (executingThread != null) { + executingThread.interrupt(); + } + for (TriggeredTask task : this.executingTasks) { + task.cancel(true); + } + this.executingTasks.clear(); + this.running = false; + } + finally { + this.lifecycleLock.unlock(); + } + } + + public boolean prefersShortLivedTasks() { + return true; + } + + public void execute(Runnable task) { + this.executor.execute(task); + } + + + private class SchedulerTask implements Runnable { + + private final AtomicReference executingThread = new AtomicReference(); + + + public void run() { + if (!this.executingThread.compareAndSet(null, Thread.currentThread())) { + throw new SchedulingException("The SchedulerTask is already running."); + } + while (SimpleTaskScheduler.this.isRunning()) { + try { + TriggeredTask task = SimpleTaskScheduler.this.scheduledTasks.take(); + SimpleTaskScheduler.this.executor.execute(task); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + + + /** + * Wrapper class that enables rescheduling of a task based on a Trigger. + */ + private class TriggeredTask extends FutureTask implements Delayed, ScheduledFuture { + + private final Trigger trigger; + + private volatile Date scheduledTime; + + + public TriggeredTask(Runnable task, Trigger trigger) { + super(new ErrorHandlingRunnableWrapper(task), null); + this.trigger = trigger; + } + + + public void setScheduledTime(Date scheduledTime) { + this.scheduledTime = scheduledTime; + } + + public void run() { + SimpleTaskScheduler.this.executingTasks.add(this); + super.runAndReset(); + SimpleTaskScheduler.this.executingTasks.remove(this); + if (isRunning() && !isCancelled()) { + SimpleTaskScheduler.this.schedule(this, this.scheduledTime, new Date()); + } + } + + public int compareTo(Delayed other) { + long thisDelay = this.getDelay(TimeUnit.MILLISECONDS); + long otherDelay = other.getDelay(TimeUnit.MILLISECONDS); + if (thisDelay < otherDelay) { + return -1; + } + if (thisDelay == otherDelay) { + return 0; + } + return 1; + } + + public long getDelay(TimeUnit unit) { + long now = new Date().getTime(); + long scheduled = this.scheduledTime.getTime(); + return (scheduled > now) ? unit.convert(scheduled - now, TimeUnit.MILLISECONDS) : 0; + } + + public synchronized boolean cancel(boolean mayInterruptIfRunning) { + if (!isCancelled()) { + SimpleTaskScheduler.this.scheduledTasks.remove(this); + } + return super.cancel(mayInterruptIfRunning); + } + } + + + /** + * Wrapper that catches any Throwable thrown by a target task and + * delegates to the {@link ErrorHandler} if available. If no error handler + * has been configured, the error will be logged at warn-level. + */ + private class ErrorHandlingRunnableWrapper implements Runnable { + + private final Runnable target; + + + public ErrorHandlingRunnableWrapper(Runnable target) { + this.target = target; + } + + + public void run() { + try { + this.target.run(); + } + catch (Throwable t) { + if (SimpleTaskScheduler.this.errorHandler != null) { + SimpleTaskScheduler.this.errorHandler.handle(t); + } + else if (logger.isWarnEnabled()) { + SimpleTaskScheduler.this.logger.warn("Error occurred in task but no 'errorHandler' is available.", t); + } + } + } + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/TaskScheduler.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/TaskScheduler.java index 947e0480f9..2990ba4d3d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/TaskScheduler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/TaskScheduler.java @@ -19,18 +19,16 @@ package org.springframework.integration.scheduling; import java.util.concurrent.ScheduledFuture; import org.springframework.context.Lifecycle; -import org.springframework.integration.util.ErrorHandler; import org.springframework.scheduling.SchedulingTaskExecutor; /** * Base interface for scheduling messaging tasks. * * @author Mark Fisher + * @author Marius Bogoevici */ public interface TaskScheduler extends SchedulingTaskExecutor, Lifecycle { - ScheduledFuture schedule(SchedulableTask task); - - boolean cancel(Runnable task, boolean mayInterruptIfRunning); + ScheduledFuture schedule(Runnable task, Trigger trigger); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/ProviderTaskScheduler.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/ProviderTaskScheduler.java deleted file mode 100644 index 888ec792c3..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/ProviderTaskScheduler.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.scheduling.spi; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.beans.factory.DisposableBean; -import org.springframework.integration.scheduling.CronSchedule; -import org.springframework.integration.scheduling.PollingSchedule; -import org.springframework.integration.scheduling.SchedulableTask; -import org.springframework.integration.scheduling.TaskScheduler; -import org.springframework.integration.util.ErrorHandler; -import org.springframework.util.Assert; - -/** - * An implementation of {@link org.springframework.integration.scheduling.TaskScheduler} that understands - * {@link org.springframework.integration.scheduling.PollingSchedule PollingSchedules} and delegates to - * a {@link ScheduleServiceProvider} instance. - * - * @author Mark Fisher - * @author Marius Bogoevici - */ -public class ProviderTaskScheduler implements TaskScheduler, DisposableBean { - - private final Log logger = LogFactory.getLog(this.getClass()); - - private ScheduleServiceProvider scheduleServiceProvider; - - private volatile boolean waitForTasksToCompleteOnShutdown = true; - - private volatile ErrorHandler errorHandler; - - private final Set pendingTasks = new CopyOnWriteArraySet(); - - private final Map> scheduledTasks = - new ConcurrentHashMap>(); - - private volatile boolean running; - - private final Object lifecycleMonitor = new Object(); - - - public ProviderTaskScheduler(ScheduleServiceProvider scheduleServiceProvider) { - Assert.notNull(scheduleServiceProvider, "'executor' must not be null"); - this.scheduleServiceProvider = scheduleServiceProvider; - } - - - public void setWaitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown) { - this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; - } - - public void setErrorHandler(ErrorHandler errorHandler) { - this.errorHandler = errorHandler; - } - - public boolean isRunning() { - return this.running; - } - - public void start() { - synchronized (this.lifecycleMonitor) { - if (this.running) { - return; - } - if (logger.isInfoEnabled()) { - logger.info("task scheduler starting"); - } - this.running = true; - for (Runnable task : this.pendingTasks) { - if (task instanceof SchedulableTask) { - this.schedule((SchedulableTask)task); - } - else { - this.execute(task); - } - } - this.pendingTasks.clear(); - if (logger.isInfoEnabled()) { - logger.info("task scheduler started successfully"); - } - } - } - - public void stop() { - synchronized (this.lifecycleMonitor) { - if (this.running) { - if (logger.isInfoEnabled()) { - logger.info("task scheduler stopping"); - } - this.running = false; - for (Runnable task : this.scheduledTasks.keySet()) { - this.cancel(task, true); - } - if (logger.isInfoEnabled()) { - logger.info("task scheduler stopped successfully"); - } - } - } - } - - public void destroy() { - synchronized (this.lifecycleMonitor) { - this.stop(); - scheduleServiceProvider.shutdown(this.waitForTasksToCompleteOnShutdown); - } - } - - public boolean prefersShortLivedTasks() { - return true; - } - - public void execute(Runnable task) { - this.scheduleServiceProvider.execute(task); - } - - public ScheduledFuture schedule(SchedulableTask task) { - synchronized (this.lifecycleMonitor) { - if (!this.running) { - if (logger.isDebugEnabled()) { - logger.debug("scheduler is not running, adding task to pending list: " + task); - } - this.pendingTasks.add(task); - return null; - } - if (logger.isDebugEnabled()) { - logger.debug("scheduling task: " + task); - } - try { - TaskRunner runner = new TaskRunner(task); - ScheduledFuture future = null; - if (task.getSchedule() == null) { - future = this.scheduleServiceProvider.scheduleWithInitialDelay(runner, 0, TimeUnit.MILLISECONDS); - } - else if (task.getSchedule() instanceof PollingSchedule) { - PollingSchedule ps = (PollingSchedule) task.getSchedule(); - if (ps.getPeriod() <= 0) { - runner.setShouldRepeat(true); - future = this.scheduleServiceProvider.scheduleWithInitialDelay(runner, ps.getInitialDelay(), - ps.getTimeUnit()); - } - else if (ps.getFixedRate()) { - future = this.scheduleServiceProvider.scheduleAtFixedRate(runner, ps.getInitialDelay(), - ps.getPeriod(), ps.getTimeUnit()); - } - else { - future = this.scheduleServiceProvider.scheduleWithFixedDelay(runner, ps.getInitialDelay(), - ps.getPeriod(), ps.getTimeUnit()); - } - } - else if (task.getSchedule() instanceof CronSchedule) { - future = this.scheduleServiceProvider.scheduleWithCronExpression(runner, - ((CronSchedule) task.getSchedule()).getCronExpression()); - } - this.scheduledTasks.put(task, future); - if (logger.isDebugEnabled()) { - logger.debug("scheduled task: " + task); - } - return future; - } - catch (Exception e) { - throw new UnschedulableTaskException(e, task.getSchedule()); - } - } - } - - public boolean cancel(Runnable task, boolean mayInterruptIfRunning) { - synchronized (this.lifecycleMonitor) { - ScheduledFuture future = this.scheduledTasks.get(task); - if (future != null) { - if (logger.isDebugEnabled()) { - logger.debug("cancelling task: " + task); - } - return future.cancel(mayInterruptIfRunning); - } - return this.pendingTasks.remove(task); - } - } - - - private class TaskRunner implements Runnable { - - private final Runnable task; - - private volatile boolean shouldRepeat; - - - public TaskRunner(Runnable task) { - this.task = task; - } - - - public void setShouldRepeat(boolean shouldRepeat) { - this.shouldRepeat = shouldRepeat; - } - - public void run() { - try { - this.task.run(); - } - catch (Throwable t) { - if (errorHandler != null) { - errorHandler.handle(t); - } - else if (logger.isWarnEnabled()) { - logger.warn("error occurred in task but no 'errorHandler' is available", t); - } - } - if (this.shouldRepeat && isRunning()) { - TaskRunner runner = new TaskRunner(this.task); - runner.setShouldRepeat(true); - scheduleServiceProvider.execute(runner); - } - } - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/ScheduleServiceProvider.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/ScheduleServiceProvider.java deleted file mode 100644 index 8a97982c1b..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/ScheduleServiceProvider.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.scheduling.spi; - -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -/** - * An SPI interface providing an abstraction of thread pooling and scheduling operations, allowing to switch between - * different platforms (such as Java 5's java.util.concurrent, Quartz, etc) at runtime. - * - * @author Marius Bogoevici - */ -public interface ScheduleServiceProvider { - - void execute(Runnable runnable); - - void shutdown(boolean waitForTasksToCompleteOnShutdown); - - ScheduledFuture scheduleWithInitialDelay(Runnable runnable, long initialDelay, TimeUnit timeUnit) - throws Exception; - - ScheduledFuture scheduleAtFixedRate(Runnable runnable, long initialDelay, long period, TimeUnit timeUnit) - throws Exception; - - ScheduledFuture scheduleWithFixedDelay(Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) - throws Exception; - - ScheduledFuture scheduleWithCronExpression(Runnable runnable, String cronExpression) - throws Exception; - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/SimpleScheduleServiceProvider.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/SimpleScheduleServiceProvider.java deleted file mode 100644 index c477820ddf..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/SimpleScheduleServiceProvider.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.scheduling.spi; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -/** - * A {@link ScheduleServiceProvider} implementation using an underlying {@link ScheduledExecutorService}. - * @author Marius Bogoevici - */ -public class SimpleScheduleServiceProvider implements ScheduleServiceProvider { - - private final ScheduledExecutorService executor; - - public SimpleScheduleServiceProvider(ScheduledExecutorService executor) { - this.executor = executor; - } - - public void execute(Runnable runnable) { - this.executor.execute(runnable); - } - - public void shutdown(boolean waitForTasksToCompleteOnShutdown) { - if (this.executor.isShutdown()) { - return; - } - if (waitForTasksToCompleteOnShutdown) { - this.executor.shutdown(); - } - else { - this.executor.shutdownNow(); - } - } - - public ScheduledFuture scheduleWithInitialDelay(Runnable runnable, long initialDelay, TimeUnit timeUnit) { - return this.executor.schedule(runnable, initialDelay, timeUnit); - } - - public ScheduledFuture scheduleAtFixedRate(Runnable runnable, long initialDelay, long period, - TimeUnit timeUnit) { - return this.executor.scheduleAtFixedRate(runnable, initialDelay, period, timeUnit); - } - - public ScheduledFuture scheduleWithFixedDelay(Runnable runnable, long initialDelay, long delay, - TimeUnit timeUnit) { - return this.executor.scheduleWithFixedDelay(runnable, initialDelay, delay, timeUnit); - } - - public ScheduledFuture scheduleWithCronExpression(Runnable runnable, String cronExpression) - throws Exception { - throw new UnsupportedOperationException("Cron scheduling not supported"); - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/UnschedulableTaskException.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/UnschedulableTaskException.java deleted file mode 100644 index c92ae329d2..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/spi/UnschedulableTaskException.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.scheduling.spi; - -import org.springframework.integration.scheduling.Schedule; - -/** - * An exception indicating that a {@link org.springframework.integration.scheduling.SchedulableTask} - * could not be scheduled. The typical reason is that the {@link ScheduleServiceProvider} does not - * understand the {@link org.springframework.integration.scheduling.Schedule}. - * - * @author Marius Bogoevici - */ -public class UnschedulableTaskException extends RuntimeException { - - private Schedule schedule; - - public UnschedulableTaskException(Throwable t, Schedule schedule) { - super(t); - this.schedule = schedule; - } - - public UnschedulableTaskException(String message, Schedule schedule) { - super(message); - this.schedule = schedule; - } - - public Schedule getSchedule() { - return schedule; - } - -} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java index 1fb069513e..bd1e642bde 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java @@ -37,8 +37,8 @@ import org.springframework.integration.bus.MessageBusInterceptorTests; import org.springframework.integration.bus.TestMessageBusAwareImpl; import org.springframework.integration.bus.TestMessageBusStartInterceptor; import org.springframework.integration.bus.TestMessageBusStopInterceptor; +import org.springframework.integration.scheduling.SimpleTaskScheduler; import org.springframework.integration.scheduling.TaskScheduler; -import org.springframework.integration.scheduling.spi.ProviderTaskScheduler; /** * @author Mark Fisher @@ -144,7 +144,7 @@ public class MessageBusParserTests { context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME); DirectFieldAccessor accessor = new DirectFieldAccessor(multicaster); Object taskExecutor = accessor.getPropertyValue("taskExecutor"); - assertEquals(ProviderTaskScheduler.class, taskExecutor.getClass()); + assertEquals(SimpleTaskScheduler.class, taskExecutor.getClass()); } @Test diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/StubTaskScheduler.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/StubTaskScheduler.java index e14ed49446..19945943e0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/StubTaskScheduler.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/StubTaskScheduler.java @@ -18,31 +18,23 @@ package org.springframework.integration.config; import java.util.concurrent.ScheduledFuture; -import org.springframework.integration.scheduling.SchedulableTask; import org.springframework.integration.scheduling.TaskScheduler; -import org.springframework.integration.util.ErrorHandler; +import org.springframework.integration.scheduling.Trigger; /** * @author Mark Fisher */ public class StubTaskScheduler implements TaskScheduler { - public boolean cancel(Runnable task, boolean mayInterruptIfRunning) { - return false; - } - - public ScheduledFuture schedule(SchedulableTask task) { + public ScheduledFuture schedule(Runnable task, Trigger trigger) { return null; } - public void setErrorHandler(ErrorHandler errorHandler) { + public void execute(Runnable task) { } public boolean prefersShortLivedTasks() { - return false; - } - - public void execute(Runnable task) { + return true; } public boolean isRunning() {