From 986baed2327f2ade77a01278004b186b6b8dee13 Mon Sep 17 00:00:00 2001 From: robokaso Date: Thu, 3 Apr 2008 09:08:41 +0000 Subject: [PATCH] IN PROGRESS - issue BATCH-546: pull duplicates from TaskletStep and ItemOrientedStep into AbstractStep pulled up listener registration to AbstractStep --- .../batch/core/step/AbstractStep.java | 42 ++++++++++++- .../core/step/item/ItemOrientedStep.java | 61 ++++++++----------- .../batch/core/step/tasklet/TaskletStep.java | 32 +++++----- .../step/item/SimpleStepFactoryBeanTests.java | 4 +- .../item/SkipLimitStepFactoryBeanTests.java | 13 ++-- .../StatefulRetryStepFactoryBeanTests.java | 3 +- .../core/step/tasklet/TaskletStepTests.java | 8 +-- 7 files changed, 94 insertions(+), 69 deletions(-) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java index 66af15187..ffd95d431 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java @@ -15,16 +15,20 @@ */ package org.springframework.batch.core.step; -import org.springframework.batch.core.UnexpectedJobExecutionException; import org.springframework.batch.core.JobInterruptedException; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.UnexpectedJobExecutionException; +import org.springframework.batch.core.listener.CompositeStepExecutionListener; /** - * A {@link Step} implementation that provides common behaviour to subclasses. + * A {@link Step} implementation that provides common behavior to subclasses, + * including registering and calling listeners. * * @author Dave Syer * @author Ben Hale + * @author Robert Kasanicky */ public abstract class AbstractStep implements Step { @@ -34,6 +38,8 @@ public abstract class AbstractStep implements Step { protected boolean allowStartIfComplete; + private CompositeStepExecutionListener listener = new CompositeStepExecutionListener(); + /** * Default constructor. */ @@ -90,5 +96,35 @@ public abstract class AbstractStep implements Step { this.name = name; } - public abstract void execute(StepExecution stepExecution) throws JobInterruptedException, UnexpectedJobExecutionException; + public abstract void execute(StepExecution stepExecution) throws JobInterruptedException, + UnexpectedJobExecutionException; + + /** + * Register a step listener for callbacks at the appropriate stages in a + * step execution. + * + * @param listener a {@link StepExecutionListener} + */ + public void registerStepExecutionListener(StepExecutionListener listener) { + this.listener.register(listener); + } + + /** + * Register each of the objects as listeners. + * + * @param listeners an array of listener objects of known types. + */ + public void setStepExecutionListeners(StepExecutionListener[] listeners) { + for (int i = 0; i < listeners.length; i++) { + registerStepExecutionListener(listeners[i]); + } + } + + /** + * @return composite listener that delegates to all registered listeners. + */ + protected StepExecutionListener getCompositeListener() { + return listener; + } + } \ No newline at end of file diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ItemOrientedStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ItemOrientedStep.java index 2689b6c4c..a63c24693 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ItemOrientedStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ItemOrientedStep.java @@ -28,7 +28,6 @@ import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.UnexpectedJobExecutionException; import org.springframework.batch.core.launch.support.ExitCodeMapper; -import org.springframework.batch.core.listener.CompositeStepExecutionListener; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.NoSuchJobException; import org.springframework.batch.core.step.AbstractStep; @@ -68,6 +67,7 @@ import org.springframework.transaction.support.DefaultTransactionDefinition; * @author Dave Syer * @author Lucas Ward * @author Ben Hale + * @author Robert Kasanicky */ public class ItemOrientedStep extends AbstractStep { @@ -87,8 +87,6 @@ public class ItemOrientedStep extends AbstractStep { private CompositeItemStream stream = new CompositeItemStream(); - private CompositeStepExecutionListener listener = new CompositeStepExecutionListener(); - private JobRepository jobRepository; private PlatformTransactionManager transactionManager; @@ -132,6 +130,21 @@ public class ItemOrientedStep extends AbstractStep { this.itemHandler = itemHandler; } + /** + * Register each of the objects as listeners. If the {@link ItemReader} or + * {@link ItemWriter} themselves implements this interface they will be + * registered automatically, but their injected dependencies will not be. + * This is a good way to get access to job parameters and execution context + * if the tasklet is parameterised. + * + * @param listeners an array of listener objects of known types. + */ + public void setStepExecutionListeners(StepExecutionListener[] listeners) { + for (int i = 0; i < listeners.length; i++) { + registerStepExecutionListener(listeners[i]); + } + } + /** * Register each of the streams for callbacks at the appropriate time in the * step. The {@link ItemReader} and {@link ItemWriter} are automatically @@ -159,31 +172,6 @@ public class ItemOrientedStep extends AbstractStep { this.stream.register(stream); } - /** - * Register each of the objects as listeners. If the {@link ItemReader} or - * {@link ItemWriter} themselves implements this interface they will be - * registered automatically, but their injected dependencies will not be. - * This is a good way to get access to job parameters and execution context - * if the tasklet is parameterised. - * - * @param listeners an array of listener objects of known types. - */ - public void setStepExecutionListeners(StepExecutionListener[] listeners) { - for (int i = 0; i < listeners.length; i++) { - registerStepExecutionListener(listeners[i]); - } - } - - /** - * Register a step listener for callbacks at the appropriate stages in a - * step execution. - * - * @param listener a {@link StepExecutionListener} - */ - public void registerStepExecutionListener(StepExecutionListener listener) { - this.listener.register(listener); - } - /** * The {@link RepeatOperations} to use for the outer loop of the batch * processing. Should be set up by the caller through a factory. Defaults to @@ -260,7 +248,7 @@ public class ItemOrientedStep extends AbstractStep { // Execute step level listeners *after* the execution context is // fixed in the step. E.g. ItemStream instances need the the same // reference to the ExecutionContext as the step execution. - listener.beforeStep(stepExecution); + getCompositeListener().beforeStep(stepExecution); stream.open(stepExecution.getExecutionContext()); stream.update(stepExecution.getExecutionContext()); jobRepository.saveOrUpdateExecutionContext(stepExecution); @@ -350,8 +338,8 @@ public class ItemOrientedStep extends AbstractStep { } }); - - status = status.and(listener.afterStep(stepExecution)); + + status = status.and(getCompositeListener().afterStep(stepExecution)); fatalException.setException(updateStatus(stepExecution, BatchStatus.COMPLETED)); } @@ -412,7 +400,8 @@ public class ItemOrientedStep extends AbstractStep { /** * @param stepExecution the current {@link StepExecution} - * @param fatalException the {@link ExceptionHolder} containing information about failures in meta-data + * @param fatalException the {@link ExceptionHolder} containing information + * about failures in meta-data * @param e the cause of teh failure * @return an {@link ExitStatus} */ @@ -426,7 +415,7 @@ public class ItemOrientedStep extends AbstractStep { if (!fatalException.hasException()) { try { // classify exception so an exit code can be stored. - status = status.and(listener.onErrorInStep(stepExecution, e)); + status = status.and(getCompositeListener().onErrorInStep(stepExecution, e)); } catch (RuntimeException ex) { logger.error("Unexpected error in listener on error in step.", ex); @@ -518,12 +507,12 @@ public class ItemOrientedStep extends AbstractStep { /** * @param stepExecution - * @param contribution + * @param contribution * @param fatalException * @param transaction */ - private void processRollback(final StepExecution stepExecution, final StepContribution contribution, final ExceptionHolder fatalException, - TransactionStatus transaction) { + private void processRollback(final StepExecution stepExecution, final StepContribution contribution, + final ExceptionHolder fatalException, TransactionStatus transaction) { stepExecution.incrementSkipCountBy(contribution.getSkipCount()); /* diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java index d4c5f0e9e..10b3e31bf 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java @@ -25,7 +25,6 @@ import org.springframework.batch.core.JobInterruptedException; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; -import org.springframework.batch.core.listener.CompositeStepExecutionListener; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.AbstractStep; import org.springframework.batch.repeat.ExitStatus; @@ -38,7 +37,13 @@ import org.springframework.util.Assert; * manage transactions or any looping functionality. The tasklet should do this * on its own. * + * If the {@link Tasklet} itself implements {@link StepExecutionListener} it + * will be registered automatically, but its injected dependencies will not be. + * This is a good way to get access to job parameters and execution context if + * the tasklet is parameterised. + * * @author Ben Hale + * @author Robert Kasanicky */ public class TaskletStep extends AbstractStep implements Step, InitializingBean, BeanNameAware { @@ -48,8 +53,6 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean, private JobRepository jobRepository; - private CompositeStepExecutionListener listener = new CompositeStepExecutionListener(); - /** * Set the name property if it is not already set. Because of the order of * the callbacks in a Spring container the name property will be set first @@ -66,17 +69,13 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean, } /** - * Register each of the objects as listeners. If the {@link Tasklet} itself - * implements this interface it will be registered automatically, but its - * injected dependencies will not be. This is a good way to get access to - * job parameters and execution context if the tasklet is parameterised. + * Register each of the objects as listeners. * - * @param listeners an array of listener objects of known types. + * @deprecated use + * {@link #setStepExecutionListeners(StepExecutionListener[])} instead */ public void setStepListeners(StepExecutionListener[] listeners) { - for (int i = 0; i < listeners.length; i++) { - this.listener.register(listeners[i]); - } + setStepExecutionListeners(listeners); } /** @@ -87,7 +86,7 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean, Assert.notNull(jobRepository, "JobRepository is mandatory for TaskletStep"); Assert.notNull(tasklet, "Tasklet is mandatory for TaskletStep"); if (tasklet instanceof StepExecutionListener) { - listener.register((StepExecutionListener) tasklet); + registerStepExecutionListener((StepExecutionListener) tasklet); } } @@ -135,9 +134,9 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean, Exception fatalException = null; try { - listener.beforeStep(stepExecution); + getCompositeListener().beforeStep(stepExecution); exitStatus = tasklet.execute(); - exitStatus = exitStatus.and(listener.afterStep(stepExecution)); + exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution)); try { jobRepository.saveOrUpdateExecutionContext(stepExecution); @@ -153,7 +152,7 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean, logger.error("Encountered an error running the tasklet"); updateStatus(stepExecution, BatchStatus.FAILED); try { - exitStatus = exitStatus.and(listener.onErrorInStep(stepExecution, e)); + exitStatus = exitStatus.and(getCompositeListener().onErrorInStep(stepExecution, e)); } catch (Exception ex) { logger.error("Encountered an error on listener close.", ex); @@ -178,7 +177,8 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean, if (fatalException != null) { logger.error("Encountered an error saving batch meta data." + "This job is now in an unknown state and should not be restarted.", fatalException); - throw new UnexpectedJobExecutionException("Encountered an error saving batch meta data.", fatalException); + throw new UnexpectedJobExecutionException("Encountered an error saving batch meta data.", + fatalException); } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleStepFactoryBeanTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleStepFactoryBeanTests.java index 7c26bddd6..dba18094b 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleStepFactoryBeanTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleStepFactoryBeanTests.java @@ -174,7 +174,7 @@ public class SimpleStepFactoryBeanTests extends TestCase { throw new RuntimeException("Foo"); } }); - ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); + AbstractStep step = (AbstractStep) factory.getObject(); job.setSteps(Collections.singletonList(step)); JobExecution jobExecution = repository.createJobExecution(job, new JobParameters()); @@ -210,7 +210,7 @@ public class SimpleStepFactoryBeanTests extends TestCase { factory.setListeners(new StepListener[]{ chunkListener }); factory.setCommitInterval(commitInterval); - ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); + AbstractStep step = (AbstractStep) factory.getObject(); job.setSteps(Collections.singletonList(step)); diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SkipLimitStepFactoryBeanTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SkipLimitStepFactoryBeanTests.java index 43cb5a394..15f5bbb0a 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SkipLimitStepFactoryBeanTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SkipLimitStepFactoryBeanTests.java @@ -13,6 +13,7 @@ import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.job.JobSupport; +import org.springframework.batch.core.step.AbstractStep; import org.springframework.batch.core.step.JobRepositorySupport; import org.springframework.batch.core.step.skip.SkipLimitExceededException; import org.springframework.batch.item.ClearFailedException; @@ -63,7 +64,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase { * Check items causing errors are skipped as expected. */ public void testSkip() throws Exception { - ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); + AbstractStep step = (AbstractStep) factory.getObject(); StepExecution stepExecution = new StepExecution(step, jobExecution); step.execute(stepExecution); @@ -91,7 +92,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase { } }); - ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); + AbstractStep step = (AbstractStep) factory.getObject(); StepExecution stepExecution = new StepExecution(step, jobExecution); try { @@ -110,7 +111,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase { factory.setSkipLimit(1); - ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); + AbstractStep step = (AbstractStep) factory.getObject(); StepExecution stepExecution = new StepExecution(step, jobExecution); @@ -145,7 +146,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase { factory.setItemReader(reader); factory.setSkippableExceptionClasses(new Class[] { Exception.class }); - ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); + AbstractStep step = (AbstractStep) factory.getObject(); StepExecution stepExecution = new StepExecution(step, jobExecution); @@ -179,7 +180,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase { factory.setSkipLimit(4); factory.setItemReader(reader); - ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); + AbstractStep step = (AbstractStep) factory.getObject(); StepExecution stepExecution = jobExecution.createStepExecution(step); @@ -206,7 +207,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase { factory.setItemReader(reader); factory.setItemWriter(writer); - ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); + AbstractStep step = (AbstractStep) factory.getObject(); StepExecution stepExecution = jobExecution.createStepExecution(step); diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/StatefulRetryStepFactoryBeanTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/StatefulRetryStepFactoryBeanTests.java index 1995ca6ef..fc5fb4436 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/StatefulRetryStepFactoryBeanTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/StatefulRetryStepFactoryBeanTests.java @@ -32,6 +32,7 @@ import org.springframework.batch.core.repository.dao.MapJobExecutionDao; import org.springframework.batch.core.repository.dao.MapJobInstanceDao; import org.springframework.batch.core.repository.dao.MapStepExecutionDao; import org.springframework.batch.core.repository.support.SimpleJobRepository; +import org.springframework.batch.core.step.AbstractStep; import org.springframework.batch.item.AbstractItemWriter; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemRecoverer; @@ -119,7 +120,7 @@ public class StatefulRetryStepFactoryBeanTests extends TestCase { }; factory.setItemReader(provider); factory.setRetryLimit(10); - ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); + AbstractStep step = (AbstractStep) factory.getObject(); step.execute(new StepExecution(step, jobExecution)); } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/TaskletStepTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/TaskletStepTests.java index 879268df7..8b8335970 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/TaskletStepTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/TaskletStepTests.java @@ -6,19 +6,17 @@ import java.util.List; import junit.framework.TestCase; import org.springframework.batch.core.BatchStatus; -import org.springframework.batch.core.UnexpectedJobExecutionException; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobInterruptedException; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.UnexpectedJobExecutionException; import org.springframework.batch.core.job.JobSupport; import org.springframework.batch.core.listener.StepExecutionListenerSupport; import org.springframework.batch.core.step.JobRepositorySupport; import org.springframework.batch.core.step.StepSupport; -import org.springframework.batch.core.step.tasklet.Tasklet; -import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.repeat.ExitStatus; public class TaskletStepTests extends TestCase { @@ -109,7 +107,7 @@ public class TaskletStepTests extends TestCase { public void testSuccessfulExecutionWithListener() throws Exception { TaskletStep step = new TaskletStep(new StubTasklet(false, false), new JobRepositorySupport()); - step.setStepListeners(new StepExecutionListener[] { new StepExecutionListenerSupport() { + step.setStepExecutionListeners(new StepExecutionListener[] { new StepExecutionListenerSupport() { public void beforeStep(StepExecution context) { list.add("open"); } @@ -186,7 +184,7 @@ public class TaskletStepTests extends TestCase { throw new RuntimeException("exception thrown in afterStep to signal failure"); } }; - step.setStepListeners(new StepExecutionListener[] { listener }); + step.setStepExecutionListeners(new StepExecutionListener[] { listener }); try { step.execute(stepExecution); fail();