Provided the ability to configure if properites are sent to the worker
via command line arguments or environment variables.
This commit is contained in:
@@ -113,6 +113,8 @@ public class DeployerPartitionHandler implements PartitionHandler, EnvironmentAw
|
||||
|
||||
private CommandLineArgsProvider commandLineArgsProvider;
|
||||
|
||||
private boolean defaultArgsAsEnvironmentVars = false;
|
||||
|
||||
public DeployerPartitionHandler(TaskLauncher taskLauncher,
|
||||
JobExplorer jobExplorer,
|
||||
Resource resource,
|
||||
@@ -137,6 +139,16 @@ public class DeployerPartitionHandler implements PartitionHandler, EnvironmentAw
|
||||
this.environmentVariablesProvider = environmentVariablesProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* If set to true, the default args that are used internally by Spring Cloud Task and
|
||||
* Spring Batch are passed as environment variables instead of command line arguments.
|
||||
*
|
||||
* @param defaultArgsAsEnvironmentVars defaults to false
|
||||
*/
|
||||
public void setDefaultArgsAsEnvironmentVars(boolean defaultArgsAsEnvironmentVars) {
|
||||
this.defaultArgsAsEnvironmentVars = defaultArgsAsEnvironmentVars;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to provide any command line arguements to be passed to each worker launched.
|
||||
*
|
||||
@@ -262,22 +274,38 @@ public class DeployerPartitionHandler implements PartitionHandler, EnvironmentAw
|
||||
this.commandLineArgsProvider
|
||||
.getCommandLineArgs(copyContext));
|
||||
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getJobExecution().getId())));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getId())));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_NAME, this.stepName));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_NAME, String.format("%s_%s_%s",
|
||||
taskExecution.getTaskName(),
|
||||
workerStepExecution.getJobExecution().getJobInstance().getJobName(),
|
||||
workerStepExecution.getStepName())));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
|
||||
String.valueOf(taskExecution.getExecutionId())));
|
||||
if(!this.defaultArgsAsEnvironmentVars) {
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getJobExecution().getId())));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getId())));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_NAME, this.stepName));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_NAME, String.format("%s_%s_%s",
|
||||
taskExecution.getTaskName(),
|
||||
workerStepExecution.getJobExecution().getJobInstance().getJobName(),
|
||||
workerStepExecution.getStepName())));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
|
||||
String.valueOf(taskExecution.getExecutionId())));
|
||||
}
|
||||
|
||||
copyContext = new ExecutionContext(workerStepExecution.getExecutionContext());
|
||||
|
||||
Map<String, String> environmentVariables = this.environmentVariablesProvider.getEnvironmentVariables(copyContext);
|
||||
|
||||
if(this.defaultArgsAsEnvironmentVars) {
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getJobExecution().getId()));
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getId()));
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_STEP_NAME, this.stepName);
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_NAME, String.format("%s_%s_%s",
|
||||
taskExecution.getTaskName(),
|
||||
workerStepExecution.getJobExecution().getJobInstance().getJobName(),
|
||||
workerStepExecution.getStepName()));
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
|
||||
String.valueOf(taskExecution.getExecutionId()));
|
||||
}
|
||||
|
||||
AppDefinition definition =
|
||||
new AppDefinition(resolveApplicationName(),
|
||||
environmentVariables);
|
||||
|
||||
@@ -161,6 +161,58 @@ public class DeployerPartitionHandlerTests {
|
||||
assertEquals("step1:partition1", resultStepExecution.getStepName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSinglePartitionAsEnvVars() throws Exception {
|
||||
|
||||
StepExecution masterStepExecution = createMasterStepExecution();
|
||||
JobExecution jobExecution = masterStepExecution.getJobExecution();
|
||||
|
||||
StepExecution workerStepExecutionStart = getStepExecutionStart(jobExecution, 4L);
|
||||
StepExecution workerStepExecutionFinish = getStepExecutionFinish(workerStepExecutionStart, BatchStatus.COMPLETED);
|
||||
|
||||
DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1");
|
||||
handler.setEnvironment(this.environment);
|
||||
handler.setDefaultArgsAsEnvironmentVars(true);
|
||||
|
||||
TaskExecution taskExecution = new TaskExecution(55, null, null, null,
|
||||
null, null, new ArrayList<String>(), null, null);
|
||||
taskExecution.setTaskName("partitionedJobTask");
|
||||
|
||||
Set<StepExecution> stepExecutions = new HashSet<>();
|
||||
stepExecutions.add(workerStepExecutionStart);
|
||||
when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions);
|
||||
|
||||
when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish);
|
||||
|
||||
handler.afterPropertiesSet();
|
||||
|
||||
handler.beforeTask(taskExecution);
|
||||
|
||||
Collection<StepExecution> results = handler.handle(this.splitter, masterStepExecution);
|
||||
|
||||
verify(this.taskLauncher).launch(this.appDeploymentRequestArgumentCaptor.capture());
|
||||
|
||||
AppDeploymentRequest request = this.appDeploymentRequestArgumentCaptor.getValue();
|
||||
|
||||
assertEquals(this.resource, request.getResource());
|
||||
assertEquals(0, request.getDeploymentProperties().size());
|
||||
|
||||
AppDefinition appDefinition = request.getDefinition();
|
||||
|
||||
assertEquals("partitionedJobTask", appDefinition.getName());
|
||||
assertTrue(request.getCommandlineArguments().isEmpty());
|
||||
assertEquals("1", request.getDefinition().getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID));
|
||||
assertEquals("4", request.getDefinition().getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID));
|
||||
assertEquals("step1", request.getDefinition().getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME));
|
||||
assertEquals("partitionedJobTask_partitionedJob_step1:partition1", request.getDefinition().getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_NAME));
|
||||
assertEquals("55", request.getDefinition().getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_PARENT_EXECUTION_ID));
|
||||
|
||||
assertEquals(1, results.size());
|
||||
StepExecution resultStepExecution = results.iterator().next();
|
||||
assertEquals(BatchStatus.COMPLETED, resultStepExecution.getStatus());
|
||||
assertEquals("step1:partition1", resultStepExecution.getStepName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParentExecutionId() throws Exception {
|
||||
|
||||
@@ -409,10 +461,6 @@ public class DeployerPartitionHandlerTests {
|
||||
assertEquals("step1:partition1", resultStepExecution.getStepName());
|
||||
}
|
||||
|
||||
private String formatArgs(String key, String value) {
|
||||
return String.format("--%s=%s", key, value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOverridingEnvironmentProperties() throws Exception {
|
||||
|
||||
@@ -655,6 +703,10 @@ public class DeployerPartitionHandlerTests {
|
||||
assertEquals("step1:partition1", resultStepExecution.getStepName());
|
||||
}
|
||||
|
||||
private String formatArgs(String key, String value) {
|
||||
return String.format("--%s=%s", key, value);
|
||||
}
|
||||
|
||||
private StepExecution getStepExecutionFinish(StepExecution stepExecutionStart, BatchStatus status) {
|
||||
StepExecution workerStepExecutionFinish = new StepExecution(stepExecutionStart.getStepName(), stepExecutionStart.getJobExecution());
|
||||
workerStepExecutionFinish.setId(stepExecutionStart.getId());
|
||||
|
||||
Reference in New Issue
Block a user