IN PROGRESS - issue BATCH-546: pull duplicates from TaskletStep and ItemOrientedStep into AbstractStep
pulled up listener registration to AbstractStep
This commit is contained in:
@@ -15,16 +15,20 @@
|
||||
*/
|
||||
package org.springframework.batch.core.step;
|
||||
|
||||
import org.springframework.batch.core.UnexpectedJobExecutionException;
|
||||
import org.springframework.batch.core.JobInterruptedException;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.StepExecutionListener;
|
||||
import org.springframework.batch.core.UnexpectedJobExecutionException;
|
||||
import org.springframework.batch.core.listener.CompositeStepExecutionListener;
|
||||
|
||||
/**
|
||||
* A {@link Step} implementation that provides common behaviour to subclasses.
|
||||
* A {@link Step} implementation that provides common behavior to subclasses,
|
||||
* including registering and calling listeners.
|
||||
*
|
||||
* @author Dave Syer
|
||||
* @author Ben Hale
|
||||
* @author Robert Kasanicky
|
||||
*/
|
||||
public abstract class AbstractStep implements Step {
|
||||
|
||||
@@ -34,6 +38,8 @@ public abstract class AbstractStep implements Step {
|
||||
|
||||
protected boolean allowStartIfComplete;
|
||||
|
||||
private CompositeStepExecutionListener listener = new CompositeStepExecutionListener();
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
@@ -90,5 +96,35 @@ public abstract class AbstractStep implements Step {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public abstract void execute(StepExecution stepExecution) throws JobInterruptedException, UnexpectedJobExecutionException;
|
||||
public abstract void execute(StepExecution stepExecution) throws JobInterruptedException,
|
||||
UnexpectedJobExecutionException;
|
||||
|
||||
/**
|
||||
* Register a step listener for callbacks at the appropriate stages in a
|
||||
* step execution.
|
||||
*
|
||||
* @param listener a {@link StepExecutionListener}
|
||||
*/
|
||||
public void registerStepExecutionListener(StepExecutionListener listener) {
|
||||
this.listener.register(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register each of the objects as listeners.
|
||||
*
|
||||
* @param listeners an array of listener objects of known types.
|
||||
*/
|
||||
public void setStepExecutionListeners(StepExecutionListener[] listeners) {
|
||||
for (int i = 0; i < listeners.length; i++) {
|
||||
registerStepExecutionListener(listeners[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return composite listener that delegates to all registered listeners.
|
||||
*/
|
||||
protected StepExecutionListener getCompositeListener() {
|
||||
return listener;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -28,7 +28,6 @@ import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.StepExecutionListener;
|
||||
import org.springframework.batch.core.UnexpectedJobExecutionException;
|
||||
import org.springframework.batch.core.launch.support.ExitCodeMapper;
|
||||
import org.springframework.batch.core.listener.CompositeStepExecutionListener;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.repository.NoSuchJobException;
|
||||
import org.springframework.batch.core.step.AbstractStep;
|
||||
@@ -68,6 +67,7 @@ import org.springframework.transaction.support.DefaultTransactionDefinition;
|
||||
* @author Dave Syer
|
||||
* @author Lucas Ward
|
||||
* @author Ben Hale
|
||||
* @author Robert Kasanicky
|
||||
*/
|
||||
public class ItemOrientedStep extends AbstractStep {
|
||||
|
||||
@@ -87,8 +87,6 @@ public class ItemOrientedStep extends AbstractStep {
|
||||
|
||||
private CompositeItemStream stream = new CompositeItemStream();
|
||||
|
||||
private CompositeStepExecutionListener listener = new CompositeStepExecutionListener();
|
||||
|
||||
private JobRepository jobRepository;
|
||||
|
||||
private PlatformTransactionManager transactionManager;
|
||||
@@ -132,6 +130,21 @@ public class ItemOrientedStep extends AbstractStep {
|
||||
this.itemHandler = itemHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register each of the objects as listeners. If the {@link ItemReader} or
|
||||
* {@link ItemWriter} themselves implements this interface they will be
|
||||
* registered automatically, but their injected dependencies will not be.
|
||||
* This is a good way to get access to job parameters and execution context
|
||||
* if the tasklet is parameterised.
|
||||
*
|
||||
* @param listeners an array of listener objects of known types.
|
||||
*/
|
||||
public void setStepExecutionListeners(StepExecutionListener[] listeners) {
|
||||
for (int i = 0; i < listeners.length; i++) {
|
||||
registerStepExecutionListener(listeners[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register each of the streams for callbacks at the appropriate time in the
|
||||
* step. The {@link ItemReader} and {@link ItemWriter} are automatically
|
||||
@@ -159,31 +172,6 @@ public class ItemOrientedStep extends AbstractStep {
|
||||
this.stream.register(stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register each of the objects as listeners. If the {@link ItemReader} or
|
||||
* {@link ItemWriter} themselves implements this interface they will be
|
||||
* registered automatically, but their injected dependencies will not be.
|
||||
* This is a good way to get access to job parameters and execution context
|
||||
* if the tasklet is parameterised.
|
||||
*
|
||||
* @param listeners an array of listener objects of known types.
|
||||
*/
|
||||
public void setStepExecutionListeners(StepExecutionListener[] listeners) {
|
||||
for (int i = 0; i < listeners.length; i++) {
|
||||
registerStepExecutionListener(listeners[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a step listener for callbacks at the appropriate stages in a
|
||||
* step execution.
|
||||
*
|
||||
* @param listener a {@link StepExecutionListener}
|
||||
*/
|
||||
public void registerStepExecutionListener(StepExecutionListener listener) {
|
||||
this.listener.register(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* The {@link RepeatOperations} to use for the outer loop of the batch
|
||||
* processing. Should be set up by the caller through a factory. Defaults to
|
||||
@@ -260,7 +248,7 @@ public class ItemOrientedStep extends AbstractStep {
|
||||
// Execute step level listeners *after* the execution context is
|
||||
// fixed in the step. E.g. ItemStream instances need the the same
|
||||
// reference to the ExecutionContext as the step execution.
|
||||
listener.beforeStep(stepExecution);
|
||||
getCompositeListener().beforeStep(stepExecution);
|
||||
stream.open(stepExecution.getExecutionContext());
|
||||
stream.update(stepExecution.getExecutionContext());
|
||||
jobRepository.saveOrUpdateExecutionContext(stepExecution);
|
||||
@@ -350,8 +338,8 @@ public class ItemOrientedStep extends AbstractStep {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
status = status.and(listener.afterStep(stepExecution));
|
||||
|
||||
status = status.and(getCompositeListener().afterStep(stepExecution));
|
||||
|
||||
fatalException.setException(updateStatus(stepExecution, BatchStatus.COMPLETED));
|
||||
}
|
||||
@@ -412,7 +400,8 @@ public class ItemOrientedStep extends AbstractStep {
|
||||
|
||||
/**
|
||||
* @param stepExecution the current {@link StepExecution}
|
||||
* @param fatalException the {@link ExceptionHolder} containing information about failures in meta-data
|
||||
* @param fatalException the {@link ExceptionHolder} containing information
|
||||
* about failures in meta-data
|
||||
* @param e the cause of teh failure
|
||||
* @return an {@link ExitStatus}
|
||||
*/
|
||||
@@ -426,7 +415,7 @@ public class ItemOrientedStep extends AbstractStep {
|
||||
if (!fatalException.hasException()) {
|
||||
try {
|
||||
// classify exception so an exit code can be stored.
|
||||
status = status.and(listener.onErrorInStep(stepExecution, e));
|
||||
status = status.and(getCompositeListener().onErrorInStep(stepExecution, e));
|
||||
}
|
||||
catch (RuntimeException ex) {
|
||||
logger.error("Unexpected error in listener on error in step.", ex);
|
||||
@@ -518,12 +507,12 @@ public class ItemOrientedStep extends AbstractStep {
|
||||
|
||||
/**
|
||||
* @param stepExecution
|
||||
* @param contribution
|
||||
* @param contribution
|
||||
* @param fatalException
|
||||
* @param transaction
|
||||
*/
|
||||
private void processRollback(final StepExecution stepExecution, final StepContribution contribution, final ExceptionHolder fatalException,
|
||||
TransactionStatus transaction) {
|
||||
private void processRollback(final StepExecution stepExecution, final StepContribution contribution,
|
||||
final ExceptionHolder fatalException, TransactionStatus transaction) {
|
||||
|
||||
stepExecution.incrementSkipCountBy(contribution.getSkipCount());
|
||||
/*
|
||||
|
||||
@@ -25,7 +25,6 @@ import org.springframework.batch.core.JobInterruptedException;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.StepExecutionListener;
|
||||
import org.springframework.batch.core.listener.CompositeStepExecutionListener;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.AbstractStep;
|
||||
import org.springframework.batch.repeat.ExitStatus;
|
||||
@@ -38,7 +37,13 @@ import org.springframework.util.Assert;
|
||||
* manage transactions or any looping functionality. The tasklet should do this
|
||||
* on its own.
|
||||
*
|
||||
* If the {@link Tasklet} itself implements {@link StepExecutionListener} it
|
||||
* will be registered automatically, but its injected dependencies will not be.
|
||||
* This is a good way to get access to job parameters and execution context if
|
||||
* the tasklet is parameterised.
|
||||
*
|
||||
* @author Ben Hale
|
||||
* @author Robert Kasanicky
|
||||
*/
|
||||
public class TaskletStep extends AbstractStep implements Step, InitializingBean, BeanNameAware {
|
||||
|
||||
@@ -48,8 +53,6 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean,
|
||||
|
||||
private JobRepository jobRepository;
|
||||
|
||||
private CompositeStepExecutionListener listener = new CompositeStepExecutionListener();
|
||||
|
||||
/**
|
||||
* Set the name property if it is not already set. Because of the order of
|
||||
* the callbacks in a Spring container the name property will be set first
|
||||
@@ -66,17 +69,13 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean,
|
||||
}
|
||||
|
||||
/**
|
||||
* Register each of the objects as listeners. If the {@link Tasklet} itself
|
||||
* implements this interface it will be registered automatically, but its
|
||||
* injected dependencies will not be. This is a good way to get access to
|
||||
* job parameters and execution context if the tasklet is parameterised.
|
||||
* Register each of the objects as listeners.
|
||||
*
|
||||
* @param listeners an array of listener objects of known types.
|
||||
* @deprecated use
|
||||
* {@link #setStepExecutionListeners(StepExecutionListener[])} instead
|
||||
*/
|
||||
public void setStepListeners(StepExecutionListener[] listeners) {
|
||||
for (int i = 0; i < listeners.length; i++) {
|
||||
this.listener.register(listeners[i]);
|
||||
}
|
||||
setStepExecutionListeners(listeners);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -87,7 +86,7 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean,
|
||||
Assert.notNull(jobRepository, "JobRepository is mandatory for TaskletStep");
|
||||
Assert.notNull(tasklet, "Tasklet is mandatory for TaskletStep");
|
||||
if (tasklet instanceof StepExecutionListener) {
|
||||
listener.register((StepExecutionListener) tasklet);
|
||||
registerStepExecutionListener((StepExecutionListener) tasklet);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,9 +134,9 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean,
|
||||
Exception fatalException = null;
|
||||
try {
|
||||
|
||||
listener.beforeStep(stepExecution);
|
||||
getCompositeListener().beforeStep(stepExecution);
|
||||
exitStatus = tasklet.execute();
|
||||
exitStatus = exitStatus.and(listener.afterStep(stepExecution));
|
||||
exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
|
||||
|
||||
try {
|
||||
jobRepository.saveOrUpdateExecutionContext(stepExecution);
|
||||
@@ -153,7 +152,7 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean,
|
||||
logger.error("Encountered an error running the tasklet");
|
||||
updateStatus(stepExecution, BatchStatus.FAILED);
|
||||
try {
|
||||
exitStatus = exitStatus.and(listener.onErrorInStep(stepExecution, e));
|
||||
exitStatus = exitStatus.and(getCompositeListener().onErrorInStep(stepExecution, e));
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Encountered an error on listener close.", ex);
|
||||
@@ -178,7 +177,8 @@ public class TaskletStep extends AbstractStep implements Step, InitializingBean,
|
||||
if (fatalException != null) {
|
||||
logger.error("Encountered an error saving batch meta data."
|
||||
+ "This job is now in an unknown state and should not be restarted.", fatalException);
|
||||
throw new UnexpectedJobExecutionException("Encountered an error saving batch meta data.", fatalException);
|
||||
throw new UnexpectedJobExecutionException("Encountered an error saving batch meta data.",
|
||||
fatalException);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -174,7 +174,7 @@ public class SimpleStepFactoryBeanTests extends TestCase {
|
||||
throw new RuntimeException("Foo");
|
||||
}
|
||||
});
|
||||
ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
|
||||
AbstractStep step = (AbstractStep) factory.getObject();
|
||||
job.setSteps(Collections.singletonList(step));
|
||||
|
||||
JobExecution jobExecution = repository.createJobExecution(job, new JobParameters());
|
||||
@@ -210,7 +210,7 @@ public class SimpleStepFactoryBeanTests extends TestCase {
|
||||
factory.setListeners(new StepListener[]{ chunkListener });
|
||||
factory.setCommitInterval(commitInterval);
|
||||
|
||||
ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
|
||||
AbstractStep step = (AbstractStep) factory.getObject();
|
||||
|
||||
job.setSteps(Collections.singletonList(step));
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import org.springframework.batch.core.JobInstance;
|
||||
import org.springframework.batch.core.JobParameters;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.job.JobSupport;
|
||||
import org.springframework.batch.core.step.AbstractStep;
|
||||
import org.springframework.batch.core.step.JobRepositorySupport;
|
||||
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
|
||||
import org.springframework.batch.item.ClearFailedException;
|
||||
@@ -63,7 +64,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase {
|
||||
* Check items causing errors are skipped as expected.
|
||||
*/
|
||||
public void testSkip() throws Exception {
|
||||
ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
|
||||
AbstractStep step = (AbstractStep) factory.getObject();
|
||||
|
||||
StepExecution stepExecution = new StepExecution(step, jobExecution);
|
||||
step.execute(stepExecution);
|
||||
@@ -91,7 +92,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase {
|
||||
}
|
||||
});
|
||||
|
||||
ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
|
||||
AbstractStep step = (AbstractStep) factory.getObject();
|
||||
StepExecution stepExecution = new StepExecution(step, jobExecution);
|
||||
|
||||
try {
|
||||
@@ -110,7 +111,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase {
|
||||
|
||||
factory.setSkipLimit(1);
|
||||
|
||||
ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
|
||||
AbstractStep step = (AbstractStep) factory.getObject();
|
||||
|
||||
StepExecution stepExecution = new StepExecution(step, jobExecution);
|
||||
|
||||
@@ -145,7 +146,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase {
|
||||
factory.setItemReader(reader);
|
||||
factory.setSkippableExceptionClasses(new Class[] { Exception.class });
|
||||
|
||||
ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
|
||||
AbstractStep step = (AbstractStep) factory.getObject();
|
||||
|
||||
StepExecution stepExecution = new StepExecution(step, jobExecution);
|
||||
|
||||
@@ -179,7 +180,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase {
|
||||
factory.setSkipLimit(4);
|
||||
factory.setItemReader(reader);
|
||||
|
||||
ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
|
||||
AbstractStep step = (AbstractStep) factory.getObject();
|
||||
|
||||
StepExecution stepExecution = jobExecution.createStepExecution(step);
|
||||
|
||||
@@ -206,7 +207,7 @@ public class SkipLimitStepFactoryBeanTests extends TestCase {
|
||||
factory.setItemReader(reader);
|
||||
factory.setItemWriter(writer);
|
||||
|
||||
ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
|
||||
AbstractStep step = (AbstractStep) factory.getObject();
|
||||
|
||||
StepExecution stepExecution = jobExecution.createStepExecution(step);
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ import org.springframework.batch.core.repository.dao.MapJobExecutionDao;
|
||||
import org.springframework.batch.core.repository.dao.MapJobInstanceDao;
|
||||
import org.springframework.batch.core.repository.dao.MapStepExecutionDao;
|
||||
import org.springframework.batch.core.repository.support.SimpleJobRepository;
|
||||
import org.springframework.batch.core.step.AbstractStep;
|
||||
import org.springframework.batch.item.AbstractItemWriter;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.batch.item.ItemRecoverer;
|
||||
@@ -119,7 +120,7 @@ public class StatefulRetryStepFactoryBeanTests extends TestCase {
|
||||
};
|
||||
factory.setItemReader(provider);
|
||||
factory.setRetryLimit(10);
|
||||
ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
|
||||
AbstractStep step = (AbstractStep) factory.getObject();
|
||||
|
||||
step.execute(new StepExecution(step, jobExecution));
|
||||
}
|
||||
|
||||
@@ -6,19 +6,17 @@ import java.util.List;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.springframework.batch.core.BatchStatus;
|
||||
import org.springframework.batch.core.UnexpectedJobExecutionException;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
import org.springframework.batch.core.JobInstance;
|
||||
import org.springframework.batch.core.JobInterruptedException;
|
||||
import org.springframework.batch.core.JobParameters;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.StepExecutionListener;
|
||||
import org.springframework.batch.core.UnexpectedJobExecutionException;
|
||||
import org.springframework.batch.core.job.JobSupport;
|
||||
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
|
||||
import org.springframework.batch.core.step.JobRepositorySupport;
|
||||
import org.springframework.batch.core.step.StepSupport;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.core.step.tasklet.TaskletStep;
|
||||
import org.springframework.batch.repeat.ExitStatus;
|
||||
|
||||
public class TaskletStepTests extends TestCase {
|
||||
@@ -109,7 +107,7 @@ public class TaskletStepTests extends TestCase {
|
||||
|
||||
public void testSuccessfulExecutionWithListener() throws Exception {
|
||||
TaskletStep step = new TaskletStep(new StubTasklet(false, false), new JobRepositorySupport());
|
||||
step.setStepListeners(new StepExecutionListener[] { new StepExecutionListenerSupport() {
|
||||
step.setStepExecutionListeners(new StepExecutionListener[] { new StepExecutionListenerSupport() {
|
||||
public void beforeStep(StepExecution context) {
|
||||
list.add("open");
|
||||
}
|
||||
@@ -186,7 +184,7 @@ public class TaskletStepTests extends TestCase {
|
||||
throw new RuntimeException("exception thrown in afterStep to signal failure");
|
||||
}
|
||||
};
|
||||
step.setStepListeners(new StepExecutionListener[] { listener });
|
||||
step.setStepExecutionListeners(new StepExecutionListener[] { listener });
|
||||
try {
|
||||
step.execute(stepExecution);
|
||||
fail();
|
||||
|
||||
Reference in New Issue
Block a user