Commit 62b9268c authored by Phillip Webb's avatar Phillip Webb

Polish "Fix Spring Batch job restart parameters handling"

See gh-14933
parent ad3c3ad3
...@@ -20,6 +20,7 @@ import java.util.Arrays; ...@@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
...@@ -33,7 +34,6 @@ import org.springframework.batch.core.JobExecutionException; ...@@ -33,7 +34,6 @@ import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.converter.DefaultJobParametersConverter; import org.springframework.batch.core.converter.DefaultJobParametersConverter;
...@@ -51,6 +51,7 @@ import org.springframework.boot.CommandLineRunner; ...@@ -51,6 +51,7 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.util.Assert;
import org.springframework.util.PatternMatchUtils; import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
...@@ -76,13 +77,13 @@ public class JobLauncherCommandLineRunner ...@@ -76,13 +77,13 @@ public class JobLauncherCommandLineRunner
private JobParametersConverter converter = new DefaultJobParametersConverter(); private JobParametersConverter converter = new DefaultJobParametersConverter();
private JobLauncher jobLauncher; private final JobLauncher jobLauncher;
private JobRegistry jobRegistry; private final JobExplorer jobExplorer;
private JobExplorer jobExplorer; private final JobRepository jobRepository;
private JobRepository jobRepository; private JobRegistry jobRegistry;
private String jobNames; private String jobNames;
...@@ -96,17 +97,17 @@ public class JobLauncherCommandLineRunner ...@@ -96,17 +97,17 @@ public class JobLauncherCommandLineRunner
* Create a new {@link JobLauncherCommandLineRunner}. * Create a new {@link JobLauncherCommandLineRunner}.
* @param jobLauncher to launch jobs * @param jobLauncher to launch jobs
* @param jobExplorer to check the job repository for previous executions * @param jobExplorer to check the job repository for previous executions
* @deprecated This constructor is deprecated in favor of * @deprecated since 2.0.7 in favor of
* {@link JobLauncherCommandLineRunner#JobLauncherCommandLineRunner(JobLauncher, JobExplorer, JobRepository)}. * {@link #JobLauncherCommandLineRunner(JobLauncher, JobExplorer, JobRepository)}. A
* A job repository is required to check if a job instance exists with the given * job repository is required to check if a job instance exists with the given
* parameters when running a job (which is not possible with the job explorer). This * parameters when running a job (which is not possible with the job explorer).
* constructor will be removed in a future version.
*/ */
@Deprecated @Deprecated
public JobLauncherCommandLineRunner(JobLauncher jobLauncher, public JobLauncherCommandLineRunner(JobLauncher jobLauncher,
JobExplorer jobExplorer) { JobExplorer jobExplorer) {
this.jobLauncher = jobLauncher; this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer; this.jobExplorer = jobExplorer;
this.jobRepository = null;
} }
/** /**
...@@ -118,6 +119,9 @@ public class JobLauncherCommandLineRunner ...@@ -118,6 +119,9 @@ public class JobLauncherCommandLineRunner
*/ */
public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer, public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
JobRepository jobRepository) { JobRepository jobRepository) {
Assert.notNull(jobLauncher, "JobLauncher must not be null");
Assert.notNull(jobExplorer, "JobExplorer must not be null");
Assert.notNull(jobRepository, "JobRepository must not be null");
this.jobLauncher = jobLauncher; this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer; this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository; this.jobRepository = jobRepository;
...@@ -169,6 +173,20 @@ public class JobLauncherCommandLineRunner ...@@ -169,6 +173,20 @@ public class JobLauncherCommandLineRunner
executeRegisteredJobs(jobParameters); executeRegisteredJobs(jobParameters);
} }
private void executeLocalJobs(JobParameters jobParameters)
throws JobExecutionException {
for (Job job : this.jobs) {
if (StringUtils.hasText(this.jobNames)) {
String[] jobsToRun = this.jobNames.split(",");
if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) {
logger.debug("Skipped job: " + job.getName());
continue;
}
}
execute(job, jobParameters);
}
}
private void executeRegisteredJobs(JobParameters jobParameters) private void executeRegisteredJobs(JobParameters jobParameters)
throws JobExecutionException { throws JobExecutionException {
if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) { if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) {
...@@ -192,76 +210,56 @@ public class JobLauncherCommandLineRunner ...@@ -192,76 +210,56 @@ public class JobLauncherCommandLineRunner
throws JobExecutionAlreadyRunningException, JobRestartException, throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException, JobInstanceAlreadyCompleteException, JobParametersInvalidException,
JobParametersNotFoundException { JobParametersNotFoundException {
String jobName = job.getName(); JobParameters parameters = getNextJobParameters(job, jobParameters);
JobParameters parameters = jobParameters;
boolean jobInstanceExists = this.jobRepository.isJobInstanceExists(jobName,
parameters);
if (jobInstanceExists) {
JobExecution lastJobExecution = this.jobRepository
.getLastJobExecution(jobName, jobParameters);
if (lastJobExecution != null && isStoppedOrFailed(lastJobExecution)
&& job.isRestartable()) {
// Retry a failed or stopped execution with previous parameters
JobParameters previousParameters = lastJobExecution.getJobParameters();
/*
* remove Non-identifying parameters from the previous execution's
* parameters since there is no way to remove them programmatically. If
* they are required (or need to be modified) on a restart, they need to
* be (re)specified.
*/
JobParameters previousIdentifyingParameters = removeNonIdentifying(
previousParameters);
// merge additional parameters with previous ones (overriding those with
// the same key)
parameters = merge(previousIdentifyingParameters, jobParameters);
}
}
else {
JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
if (incrementer != null) {
JobParameters nextParameters = new JobParametersBuilder(jobParameters,
this.jobExplorer).getNextJobParameters(job).toJobParameters();
parameters = merge(nextParameters, jobParameters);
}
}
JobExecution execution = this.jobLauncher.run(job, parameters); JobExecution execution = this.jobLauncher.run(job, parameters);
if (this.publisher != null) { if (this.publisher != null) {
this.publisher.publishEvent(new JobExecutionEvent(execution)); this.publisher.publishEvent(new JobExecutionEvent(execution));
} }
} }
private void executeLocalJobs(JobParameters jobParameters) private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) {
throws JobExecutionException { if (this.jobRepository != null
for (Job job : this.jobs) { && this.jobRepository.isJobInstanceExists(job.getName(), jobParameters)) {
if (StringUtils.hasText(this.jobNames)) { return getNextJobParametersForExisting(job, jobParameters);
String[] jobsToRun = this.jobNames.split(","); }
if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) { if (job.getJobParametersIncrementer() == null) {
logger.debug("Skipped job: " + job.getName()); return jobParameters;
continue;
}
}
execute(job, jobParameters);
} }
JobParameters nextParameters = new JobParametersBuilder(jobParameters,
this.jobExplorer).getNextJobParameters(job).toJobParameters();
return merge(nextParameters, jobParameters);
} }
private JobParameters removeNonIdentifying(JobParameters parameters) { private JobParameters getNextJobParametersForExisting(Job job,
Map<String, JobParameter> parameterMap = parameters.getParameters(); JobParameters jobParameters) {
HashMap<String, JobParameter> copy = new HashMap<>(parameterMap); JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(),
for (Map.Entry<String, JobParameter> parameter : copy.entrySet()) { jobParameters);
if (!parameter.getValue().isIdentifying()) { if (isStoppedOrFailed(lastExecution) && job.isRestartable()) {
parameterMap.remove(parameter.getKey()); JobParameters previousIdentifyingParameters = getGetIdentifying(
} lastExecution.getJobParameters());
return merge(previousIdentifyingParameters, jobParameters);
} }
return new JobParameters(parameterMap); return jobParameters;
} }
private boolean isStoppedOrFailed(JobExecution execution) { private boolean isStoppedOrFailed(JobExecution execution) {
BatchStatus status = execution.getStatus(); BatchStatus status = (execution != null) ? execution.getStatus() : null;
return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED); return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED);
} }
private JobParameters getGetIdentifying(JobParameters parameters) {
HashMap<String, JobParameter> nonIdentifying = new LinkedHashMap<>(
parameters.getParameters().size());
parameters.getParameters().forEach((key, value) -> {
if (value.isIdentifying()) {
nonIdentifying.put(key, value);
}
});
return new JobParameters(nonIdentifying);
}
private JobParameters merge(JobParameters parameters, JobParameters additionals) { private JobParameters merge(JobParameters parameters, JobParameters additionals) {
Map<String, JobParameter> merged = new HashMap<>(); Map<String, JobParameter> merged = new LinkedHashMap<>();
merged.putAll(parameters.getParameters()); merged.putAll(parameters.getParameters());
merged.putAll(additionals.getParameters()); merged.putAll(additionals.getParameters());
return new JobParameters(merged); return new JobParameters(merged);
......
...@@ -46,6 +46,7 @@ import org.springframework.core.task.SyncTaskExecutor; ...@@ -46,6 +46,7 @@ import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.Assertions.fail;
/** /**
...@@ -150,17 +151,12 @@ public class JobLauncherCommandLineRunnerTests { ...@@ -150,17 +151,12 @@ public class JobLauncherCommandLineRunnerTests {
// A failed job that is not restartable does not re-use the job params of // A failed job that is not restartable does not re-use the job params of
// the last execution, but creates a new job instance when running it again. // the last execution, but creates a new job instance when running it again.
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2); assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2);
try { assertThatExceptionOfType(JobRestartException.class).isThrownBy(() -> {
// try to re-run a failed execution // try to re-run a failed execution
this.runner.execute(this.job, this.runner.execute(this.job,
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters()); new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
fail("expected JobRestartException"); fail("expected JobRestartException");
} }).withMessageContaining("JobInstance already exists and is not restartable");
catch (JobRestartException ex) {
assertThat(ex.getMessage())
.isEqualTo("JobInstance already exists and is not restartable");
// expected
}
} }
@Test @Test
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment