Support asynchronous launch of partitions.
Resolves #785 Updated to allow user to set ThreadPoolTaskExecutor. updated based on code review
This commit is contained in:
@@ -62,7 +62,7 @@ most cloud infrastructures. The `DeployerPartitionHandler` and
|
||||
Cloud Deployer.
|
||||
|
||||
To configure the `DeployerStepExecutionHandler`, you must provide a `Resource`
|
||||
representing the Spring Boot über-jar to be executed, a `TaskLauncher`, and a
|
||||
representing the Spring Boot über-jar to be executed, a `TaskLauncherHandler`, and a
|
||||
`JobExplorer`. You can configure any environment properties as well as the max number of
|
||||
workers to be executing at once, the interval to poll for the results (defaults to 10
|
||||
seconds), and a timeout (defaults to -1 or no timeout). The following example shows how
|
||||
@@ -135,6 +135,40 @@ NOTE: You can find a sample remote partition application in the samples module o
|
||||
Spring Cloud Task project,
|
||||
https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/partitioned-batch-job[here].
|
||||
|
||||
=== Asynchronously launch remote batch partitions
|
||||
|
||||
By default batch partitions are launched sequentially. However, in some cases this may affect performance as each launch will block until the resource (For example: provisioning a pod in Kubernetes) is provisioned.
|
||||
In these cases you can provide a `ThreadPoolTaskExecutor` to the `DeployerPartitionHandler`. This will launch the remote batch partitions based on the configuration of the `ThreadPoolTaskExecutor`.
|
||||
For example:
|
||||
|
||||
[source,java]
|
||||
----
|
||||
@Bean
|
||||
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(4);
|
||||
executor.setThreadNamePrefix("default_task_executor_thread");
|
||||
executor.setWaitForTasksToCompleteOnShutdown(true);
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
|
||||
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
|
||||
Resource resource = this.resourceLoader
|
||||
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
|
||||
|
||||
DeployerPartitionHandler partitionHandler =
|
||||
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
|
||||
"workerStep", taskRepository, executor);
|
||||
...
|
||||
}
|
||||
----
|
||||
|
||||
NOTE: We need to close the context since the use of `ThreadPoolTaskExecutor` leaves a thread active thus the app will not terminate. To close the application appropriately, we will need to set `spring.cloud.task.closecontextEnabled` property to `true`.
|
||||
|
||||
|
||||
=== Notes on Developing a Batch-partitioned application for the Kubernetes Platform
|
||||
|
||||
* When deploying partitioned apps on the Kubernetes platform, you must use the following
|
||||
|
||||
@@ -190,9 +190,7 @@ following property:
|
||||
=== External Task Id
|
||||
|
||||
Spring Cloud Task lets you store an external task ID for each
|
||||
`TaskExecution`. An example of this would be a task ID provided by
|
||||
Cloud Foundry when a task is launched on the platform.
|
||||
In order to configure your Task to use a generated `TaskExecutionId`, add the
|
||||
`TaskExecution`. In order to configure your Task to use a generated `TaskExecutionId`, add the
|
||||
following property:
|
||||
|
||||
`spring.cloud.task.external-execution-id=<externalTaskId>`
|
||||
|
||||
4
pom.xml
4
pom.xml
@@ -102,8 +102,8 @@
|
||||
<properties>
|
||||
|
||||
<spring-cloud-stream.version>4.0.0-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-deployer.version>2.7.4</spring-cloud-deployer.version>
|
||||
<spring-cloud-deployer-local.version>2.7.4
|
||||
<spring-cloud-deployer.version>2.8.0-SNAPSHOT</spring-cloud-deployer.version>
|
||||
<spring-cloud-deployer-local.version>2.8.0-SNAPSHOT
|
||||
</spring-cloud-deployer-local.version>
|
||||
<spring-cloud-stream-binder-rabbit.version>${spring-cloud-stream.version}
|
||||
</spring-cloud-stream-binder-rabbit.version>
|
||||
|
||||
@@ -20,7 +20,6 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
@@ -35,13 +34,10 @@ import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.explore.JobExplorer;
|
||||
import org.springframework.batch.core.partition.PartitionHandler;
|
||||
import org.springframework.batch.core.partition.StepExecutionSplitter;
|
||||
import org.springframework.batch.item.ExecutionContext;
|
||||
import org.springframework.batch.poller.DirectPoller;
|
||||
import org.springframework.batch.poller.Poller;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.deployer.spi.core.AppDefinition;
|
||||
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
|
||||
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
|
||||
import org.springframework.cloud.task.listener.annotation.BeforeTask;
|
||||
import org.springframework.cloud.task.repository.TaskExecution;
|
||||
@@ -49,9 +45,11 @@ import org.springframework.cloud.task.repository.TaskRepository;
|
||||
import org.springframework.context.EnvironmentAware;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.core.task.SyncTaskExecutor;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
@@ -145,19 +143,23 @@ public class DeployerPartitionHandler
|
||||
|
||||
private boolean defaultArgsAsEnvironmentVars = false;
|
||||
|
||||
private TaskExecutor taskExecutor;
|
||||
|
||||
|
||||
@Autowired
|
||||
private TaskRepository taskRepository;
|
||||
|
||||
/**
|
||||
* Constructor initializing the DeployerPartitionHandler instance.
|
||||
* @param taskLauncher the launcher used to execute partitioned tasks.
|
||||
* @param jobExplorer used to acquire the status of the job.
|
||||
* @param resource the url to the app to be launched.
|
||||
* @param stepName the name of the step.
|
||||
* @param taskLauncher The {@link org.springframework.cloud.deployer.spi.task.TaskLauncher} used to execute partitioned tasks.
|
||||
* @param jobExplorer The {@link JobExplorer} to acquire the status of the job.
|
||||
* @param resource The {@link Resource} to the app to be launched.
|
||||
* @param stepName The name of the step.
|
||||
* @param taskExecutor If task launches should occur asynchronously then provide a {@link ThreadPoolTaskExecutor}. Default is null.
|
||||
*/
|
||||
|
||||
public DeployerPartitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
|
||||
Resource resource, String stepName, TaskRepository taskRepository) {
|
||||
public DeployerPartitionHandler(org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher, JobExplorer jobExplorer,
|
||||
Resource resource, String stepName, TaskRepository taskRepository,
|
||||
TaskExecutor taskExecutor) {
|
||||
Assert.notNull(taskLauncher, "A taskLauncher is required");
|
||||
Assert.notNull(jobExplorer, "A jobExplorer is required");
|
||||
Assert.notNull(resource, "A resource is required");
|
||||
@@ -169,6 +171,19 @@ public class DeployerPartitionHandler
|
||||
this.resource = resource;
|
||||
this.stepName = stepName;
|
||||
this.taskRepository = taskRepository;
|
||||
this.taskExecutor = taskExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor initializing the DeployerPartitionHandler instance.
|
||||
* @param taskLauncher The {@link org.springframework.cloud.deployer.spi.task.TaskLauncher} used to execute partitioned tasks.
|
||||
* @param jobExplorer The {@link JobExplorer} to acquire the status of the job.
|
||||
* @param resource The {@link Resource} to the app to be launched.
|
||||
* @param stepName The name of the step.
|
||||
*/
|
||||
public DeployerPartitionHandler(org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher, JobExplorer jobExplorer,
|
||||
Resource resource, String stepName, TaskRepository taskRepository) {
|
||||
this(taskLauncher, jobExplorer, resource, stepName, taskRepository, new SyncTaskExecutor());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -235,8 +250,8 @@ public class DeployerPartitionHandler
|
||||
}
|
||||
|
||||
/**
|
||||
* Map of deployment properties to be used by the {@link TaskLauncher}.
|
||||
* @param deploymentProperties properties to be used by the {@link TaskLauncher}
|
||||
* Map of deployment properties to be used by the {@link org.springframework.cloud.deployer.spi.task.TaskLauncher}.
|
||||
* @param deploymentProperties properties to be used by the {@link org.springframework.cloud.deployer.spi.task.TaskLauncher}
|
||||
*/
|
||||
public void setDeploymentProperties(Map<String, String> deploymentProperties) {
|
||||
this.deploymentProperties = deploymentProperties;
|
||||
@@ -293,110 +308,32 @@ public class DeployerPartitionHandler
|
||||
|
||||
private void launchWorkers(Set<StepExecution> candidates,
|
||||
Set<StepExecution> executed) {
|
||||
TaskLauncherHandler taskLauncherHandler = new TaskLauncherHandler(this.commandLineArgsProvider,
|
||||
this.taskRepository, this.defaultArgsAsEnvironmentVars,
|
||||
this.stepName, this.taskExecution, this.environmentVariablesProvider,
|
||||
this.resource, this.deploymentProperties,
|
||||
this.taskLauncher,
|
||||
this.applicationName);
|
||||
for (StepExecution execution : candidates) {
|
||||
if (this.currentWorkers < this.maxWorkers || this.maxWorkers < 0) {
|
||||
launchWorker(execution);
|
||||
if (this.taskExecutor != null) {
|
||||
TaskLauncherHandler taskLauncherThread = new TaskLauncherHandler(this.commandLineArgsProvider,
|
||||
this.taskRepository, this.defaultArgsAsEnvironmentVars,
|
||||
this.stepName, this.taskExecution, this.environmentVariablesProvider,
|
||||
this.resource, this.deploymentProperties,
|
||||
this.taskLauncher,
|
||||
this.applicationName, execution);
|
||||
this.taskExecutor.execute(taskLauncherThread);
|
||||
}
|
||||
else {
|
||||
taskLauncherHandler.launchWorker(execution);
|
||||
}
|
||||
this.currentWorkers++;
|
||||
|
||||
executed.add(execution);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void launchWorker(StepExecution workerStepExecution) {
|
||||
List<String> arguments = new ArrayList<>();
|
||||
|
||||
ExecutionContext copyContext = new ExecutionContext(
|
||||
workerStepExecution.getExecutionContext());
|
||||
|
||||
arguments.addAll(this.commandLineArgsProvider.getCommandLineArgs(copyContext));
|
||||
|
||||
TaskExecution partitionTaskExecution = null;
|
||||
|
||||
if (this.taskRepository != null) {
|
||||
partitionTaskExecution = this.taskRepository.createTaskExecution();
|
||||
}
|
||||
else {
|
||||
logger.warn(
|
||||
"TaskRepository was not set so external execution id will not be recorded.");
|
||||
}
|
||||
|
||||
if (!this.defaultArgsAsEnvironmentVars) {
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getJobExecution().getId())));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getId())));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_NAME, this.stepName));
|
||||
arguments
|
||||
.add(formatArgument(SPRING_CLOUD_TASK_NAME,
|
||||
String.format("%s_%s_%s", this.taskExecution.getTaskName(),
|
||||
workerStepExecution.getJobExecution().getJobInstance()
|
||||
.getJobName(),
|
||||
workerStepExecution.getStepName())));
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
|
||||
String.valueOf(this.taskExecution.getExecutionId())));
|
||||
|
||||
if (partitionTaskExecution != null) {
|
||||
arguments.add(formatArgument(SPRING_CLOUD_TASK_EXECUTION_ID,
|
||||
String.valueOf(partitionTaskExecution.getExecutionId())));
|
||||
}
|
||||
}
|
||||
|
||||
copyContext = new ExecutionContext(workerStepExecution.getExecutionContext());
|
||||
|
||||
Map<String, String> environmentVariables = this.environmentVariablesProvider
|
||||
.getEnvironmentVariables(copyContext);
|
||||
|
||||
if (this.defaultArgsAsEnvironmentVars) {
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getJobExecution().getId()));
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getId()));
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_STEP_NAME, this.stepName);
|
||||
environmentVariables
|
||||
.put(SPRING_CLOUD_TASK_NAME,
|
||||
String.format("%s_%s_%s", this.taskExecution.getTaskName(),
|
||||
workerStepExecution.getJobExecution().getJobInstance()
|
||||
.getJobName(),
|
||||
workerStepExecution.getStepName()));
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
|
||||
String.valueOf(this.taskExecution.getExecutionId()));
|
||||
environmentVariables.put(SPRING_CLOUD_TASK_EXECUTION_ID,
|
||||
String.valueOf(partitionTaskExecution.getExecutionId()));
|
||||
}
|
||||
|
||||
AppDefinition definition = new AppDefinition(resolveApplicationName(),
|
||||
environmentVariables);
|
||||
|
||||
AppDeploymentRequest request = new AppDeploymentRequest(definition, this.resource,
|
||||
this.deploymentProperties, arguments);
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(
|
||||
"Requesting the launch of the following application: " + request);
|
||||
}
|
||||
|
||||
String externalExecutionId = this.taskLauncher.launch(request);
|
||||
|
||||
if (this.taskRepository != null) {
|
||||
this.taskRepository.updateExternalExecutionId(
|
||||
partitionTaskExecution.getExecutionId(), externalExecutionId);
|
||||
}
|
||||
}
|
||||
|
||||
private String resolveApplicationName() {
|
||||
if (StringUtils.hasText(this.applicationName)) {
|
||||
return this.applicationName;
|
||||
}
|
||||
else {
|
||||
return this.taskExecution.getTaskName();
|
||||
}
|
||||
}
|
||||
|
||||
private String formatArgument(String key, String value) {
|
||||
return String.format("--%s=%s", key, value);
|
||||
}
|
||||
|
||||
private Collection<StepExecution> pollReplies(final StepExecution masterStepExecution,
|
||||
final Set<StepExecution> executed, final Set<StepExecution> candidates,
|
||||
final int size) throws Exception {
|
||||
@@ -468,5 +405,4 @@ public class DeployerPartitionHandler
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,235 @@
|
||||
/*
|
||||
* Copyright 2022-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.task.batch.partition;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.item.ExecutionContext;
|
||||
import org.springframework.cloud.deployer.spi.core.AppDefinition;
|
||||
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
|
||||
import org.springframework.cloud.task.repository.TaskExecution;
|
||||
import org.springframework.cloud.task.repository.TaskRepository;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Supports the launching of partitions.
|
||||
*
|
||||
* @author Glenn Renfro
|
||||
*/
|
||||
public class TaskLauncherHandler implements Runnable {
|
||||
|
||||
private CommandLineArgsProvider commandLineArgsProvider;
|
||||
|
||||
private TaskRepository taskRepository;
|
||||
|
||||
private boolean defaultArgsAsEnvironmentVars;
|
||||
|
||||
private String stepName;
|
||||
|
||||
private TaskExecution taskExecution;
|
||||
|
||||
private EnvironmentVariablesProvider environmentVariablesProvider;
|
||||
|
||||
private Resource resource;
|
||||
|
||||
private Map<String, String> deploymentProperties;
|
||||
|
||||
private org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher;
|
||||
|
||||
private String applicationName;
|
||||
|
||||
private StepExecution workerStepExecution;
|
||||
|
||||
private Log logger = LogFactory.getLog(TaskLauncherHandler.class);
|
||||
|
||||
/**
|
||||
* @param commandLineArgsProvider The {@link CommandLineArgsProvider} that provides command line
|
||||
* arguments passed to each partition's execution.
|
||||
* @param taskRepository The {@link TaskRepository} task repository for launching the partition.
|
||||
* @param defaultArgsAsEnvironmentVars - If set to true, the default args that are used
|
||||
* internally by Spring Cloud Task and Spring Batch are passed as environment variables instead of command line arguments.
|
||||
* @param stepName The name of the step.
|
||||
* @param taskExecution The {@link TaskExecution} to be associated with the partition.
|
||||
* @param environmentVariablesProvider {@link EnvironmentVariablesProvider} that provides the environmennt variables.
|
||||
* @param resource The {@link Resource} to be launched.
|
||||
* @param deploymentProperties The {@link Map} containing the deployment properties for the partition.
|
||||
* @param taskLauncher {@link org.springframework.cloud.deployer.spi.task.TaskLauncher} that is used to launch the partition.
|
||||
* @param applicationName The name to be associated with task.
|
||||
* @param workerStepExecution The {@link StepExecution} for the paritition.
|
||||
*/
|
||||
public TaskLauncherHandler(CommandLineArgsProvider commandLineArgsProvider,
|
||||
TaskRepository taskRepository, boolean defaultArgsAsEnvironmentVars,
|
||||
String stepName, TaskExecution taskExecution, EnvironmentVariablesProvider
|
||||
environmentVariablesProvider, Resource resource, Map<String, String> deploymentProperties,
|
||||
org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher,
|
||||
String applicationName, StepExecution workerStepExecution) {
|
||||
this.commandLineArgsProvider = commandLineArgsProvider;
|
||||
this.taskRepository = taskRepository;
|
||||
this.defaultArgsAsEnvironmentVars = defaultArgsAsEnvironmentVars;
|
||||
this.stepName = stepName;
|
||||
this.taskExecution = taskExecution;
|
||||
this.environmentVariablesProvider = environmentVariablesProvider;
|
||||
this.resource = resource;
|
||||
this.deploymentProperties = deploymentProperties;
|
||||
this.taskLauncher = taskLauncher;
|
||||
this.applicationName = applicationName;
|
||||
this.workerStepExecution = workerStepExecution;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param commandLineArgsProvider The {@link CommandLineArgsProvider} that provides command line
|
||||
* arguments passed to each partition's execution.
|
||||
* @param taskRepository The {@link TaskRepository} task repository for launching the partition.
|
||||
* @param defaultArgsAsEnvironmentVars - If set to true, the default args that are used
|
||||
* internally by Spring Cloud Task and Spring Batch are passed as environment variables instead of command line arguments.
|
||||
* @param stepName The name of the step.
|
||||
* @param taskExecution The {@link TaskExecution} to be associated with the partition.
|
||||
* @param environmentVariablesProvider {@link EnvironmentVariablesProvider} that provides the environmennt variables.
|
||||
* @param resource The {@link Resource} to be launched.
|
||||
* @param deploymentProperties The {@link Map} containing the deployment properties for the partition.
|
||||
* @param taskLauncher {@link org.springframework.cloud.deployer.spi.task.TaskLauncher} that is used to launch the partition.
|
||||
* @param applicationName The name to be associated with task.
|
||||
*/
|
||||
public TaskLauncherHandler(CommandLineArgsProvider commandLineArgsProvider,
|
||||
TaskRepository taskRepository, boolean defaultArgsAsEnvironmentVars,
|
||||
String stepName, TaskExecution taskExecution, EnvironmentVariablesProvider
|
||||
environmentVariablesProvider, Resource resource, Map<String, String> deploymentProperties,
|
||||
org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher,
|
||||
String applicationName) {
|
||||
this.commandLineArgsProvider = commandLineArgsProvider;
|
||||
this.taskRepository = taskRepository;
|
||||
this.defaultArgsAsEnvironmentVars = defaultArgsAsEnvironmentVars;
|
||||
this.stepName = stepName;
|
||||
this.taskExecution = taskExecution;
|
||||
this.environmentVariablesProvider = environmentVariablesProvider;
|
||||
this.resource = resource;
|
||||
this.deploymentProperties = deploymentProperties;
|
||||
this.taskLauncher = taskLauncher;
|
||||
this.applicationName = applicationName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
launchWorker(this.workerStepExecution);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Launches the partition for the StepExecution.
|
||||
* @param workerStepExecution The {@link StepExecution}
|
||||
*/
|
||||
public void launchWorker(StepExecution workerStepExecution) {
|
||||
List<String> arguments = new ArrayList<>();
|
||||
|
||||
ExecutionContext copyContext = new ExecutionContext(
|
||||
workerStepExecution.getExecutionContext());
|
||||
|
||||
arguments.addAll(this.commandLineArgsProvider.getCommandLineArgs(copyContext));
|
||||
|
||||
TaskExecution partitionTaskExecution = null;
|
||||
|
||||
if (this.taskRepository != null) {
|
||||
partitionTaskExecution = this.taskRepository.createTaskExecution();
|
||||
}
|
||||
else {
|
||||
logger.warn(
|
||||
"TaskRepository was not set so external execution id will not be recorded.");
|
||||
}
|
||||
|
||||
if (!this.defaultArgsAsEnvironmentVars) {
|
||||
arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getJobExecution().getId())));
|
||||
arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getId())));
|
||||
arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME, this.stepName));
|
||||
arguments
|
||||
.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_NAME,
|
||||
String.format("%s_%s_%s", this.taskExecution.getTaskName(),
|
||||
workerStepExecution.getJobExecution().getJobInstance()
|
||||
.getJobName(),
|
||||
workerStepExecution.getStepName())));
|
||||
arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
|
||||
String.valueOf(this.taskExecution.getExecutionId())));
|
||||
|
||||
if (partitionTaskExecution != null) {
|
||||
arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_EXECUTION_ID,
|
||||
String.valueOf(partitionTaskExecution.getExecutionId())));
|
||||
}
|
||||
}
|
||||
|
||||
copyContext = new ExecutionContext(workerStepExecution.getExecutionContext());
|
||||
|
||||
Map<String, String> environmentVariables = this.environmentVariablesProvider
|
||||
.getEnvironmentVariables(copyContext);
|
||||
|
||||
if (this.defaultArgsAsEnvironmentVars) {
|
||||
environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getJobExecution().getId()));
|
||||
environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
|
||||
String.valueOf(workerStepExecution.getId()));
|
||||
environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME, this.stepName);
|
||||
environmentVariables
|
||||
.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_NAME,
|
||||
String.format("%s_%s_%s", this.taskExecution.getTaskName(),
|
||||
workerStepExecution.getJobExecution().getJobInstance()
|
||||
.getJobName(),
|
||||
workerStepExecution.getStepName()));
|
||||
environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
|
||||
String.valueOf(this.taskExecution.getExecutionId()));
|
||||
environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_EXECUTION_ID,
|
||||
String.valueOf(partitionTaskExecution.getExecutionId()));
|
||||
}
|
||||
|
||||
AppDefinition definition = new AppDefinition(resolveApplicationName(),
|
||||
environmentVariables);
|
||||
|
||||
AppDeploymentRequest request = new AppDeploymentRequest(definition, this.resource,
|
||||
this.deploymentProperties, arguments);
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(
|
||||
"Requesting the launch of the following application: " + request);
|
||||
}
|
||||
String externalExecutionId = this.taskLauncher.launch(request);
|
||||
|
||||
if (this.taskRepository != null) {
|
||||
this.taskRepository.updateExternalExecutionId(
|
||||
partitionTaskExecution.getExecutionId(), externalExecutionId);
|
||||
}
|
||||
}
|
||||
|
||||
private String formatArgument(String key, String value) {
|
||||
return String.format("--%s=%s", key, value);
|
||||
}
|
||||
|
||||
private String resolveApplicationName() {
|
||||
if (StringUtils.hasText(this.applicationName)) {
|
||||
return this.applicationName;
|
||||
}
|
||||
else {
|
||||
return this.taskExecution.getTaskName();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -51,6 +51,7 @@ import org.springframework.cloud.task.repository.TaskRepository;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.mock.env.MockEnvironment;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
|
||||
@@ -315,25 +316,39 @@ public class DeployerPartitionHandlerTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThreePartitions() throws Exception {
|
||||
public void testThreePartitionsSequential() throws Exception {
|
||||
testThreePartitions(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThreePartitionsAsynchronous() throws Exception {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(4);
|
||||
executor.setThreadNamePrefix("default_task_executor_thread");
|
||||
executor.setWaitForTasksToCompleteOnShutdown(true);
|
||||
executor.initialize();
|
||||
testThreePartitions(executor);
|
||||
}
|
||||
|
||||
private void testThreePartitions(ThreadPoolTaskExecutor threadPoolTaskExecutor) throws Exception {
|
||||
|
||||
StepExecution masterStepExecution = createMasterStepExecution();
|
||||
JobExecution jobExecution = masterStepExecution.getJobExecution();
|
||||
|
||||
StepExecution workerStepExecutionStart1 = getStepExecutionStart(jobExecution, 4L);
|
||||
StepExecution workerStepExecutionFinish1 = getStepExecutionFinish(
|
||||
workerStepExecutionStart1, BatchStatus.COMPLETED);
|
||||
workerStepExecutionStart1, BatchStatus.COMPLETED);
|
||||
|
||||
StepExecution workerStepExecutionStart2 = getStepExecutionStart(jobExecution, 5L);
|
||||
StepExecution workerStepExecutionFinish2 = getStepExecutionFinish(
|
||||
workerStepExecutionStart2, BatchStatus.COMPLETED);
|
||||
workerStepExecutionStart2, BatchStatus.COMPLETED);
|
||||
|
||||
StepExecution workerStepExecutionStart3 = getStepExecutionStart(jobExecution, 6L);
|
||||
StepExecution workerStepExecutionFinish3 = getStepExecutionFinish(
|
||||
workerStepExecutionStart3, BatchStatus.COMPLETED);
|
||||
workerStepExecutionStart3, BatchStatus.COMPLETED);
|
||||
|
||||
DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher,
|
||||
this.jobExplorer, this.resource, "step1", this.taskRepository);
|
||||
this.jobExplorer, this.resource, "step1", this.taskRepository, threadPoolTaskExecutor);
|
||||
handler.setEnvironment(this.environment);
|
||||
|
||||
TaskExecution taskExecution = new TaskExecution();
|
||||
@@ -348,23 +363,23 @@ public class DeployerPartitionHandlerTests {
|
||||
when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions);
|
||||
|
||||
when(this.jobExplorer.getStepExecution(1L, 4L))
|
||||
.thenReturn(workerStepExecutionFinish1);
|
||||
.thenReturn(workerStepExecutionFinish1);
|
||||
when(this.jobExplorer.getStepExecution(1L, 5L))
|
||||
.thenReturn(workerStepExecutionFinish2);
|
||||
.thenReturn(workerStepExecutionFinish2);
|
||||
when(this.jobExplorer.getStepExecution(1L, 6L))
|
||||
.thenReturn(workerStepExecutionFinish3);
|
||||
.thenReturn(workerStepExecutionFinish3);
|
||||
|
||||
handler.afterPropertiesSet();
|
||||
|
||||
handler.beforeTask(taskExecution);
|
||||
Collection<StepExecution> results = handler.handle(this.splitter,
|
||||
masterStepExecution);
|
||||
|
||||
masterStepExecution);
|
||||
Thread.sleep(5000);
|
||||
verify(this.taskLauncher, times(3))
|
||||
.launch(this.appDeploymentRequestArgumentCaptor.capture());
|
||||
.launch(this.appDeploymentRequestArgumentCaptor.capture());
|
||||
|
||||
List<AppDeploymentRequest> allValues = this.appDeploymentRequestArgumentCaptor
|
||||
.getAllValues();
|
||||
.getAllValues();
|
||||
|
||||
validateAppDeploymentRequests(allValues, 3);
|
||||
|
||||
|
||||
@@ -30,4 +30,13 @@ NOTE: Since this example uses the Spring Cloud Deployer Local to launch the part
|
||||
== Dependencies:
|
||||
|
||||
A datasource (not in memory) must be configured based on normal Spring Boot conventions
|
||||
(application.properties/application.yml/environment variables/etc).
|
||||
(application.properties, application.yml, environment variables).
|
||||
|
||||
== Asynchronous remote partition task launch
|
||||
Currently partitions are launched sequentially. To launch them asynchronously set the following environment variables:
|
||||
|
||||
* `spring.cloud.task.closecontextEnabled=true`
|
||||
* `io.spring.asynchronous=true`
|
||||
|
||||
NOTE: We need to close the context since the use of ThreadPoolTaskExecutor leaves a thread active thus the app will not terminate.
|
||||
To close the application appropriately, we will need to set `spring.cloud.task.closecontextEnabled`` to true.
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
<spring-cloud-commons.version>4.0.0-SNAPSHOT</spring-cloud-commons.version>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<java.version>17</java.version>
|
||||
<spring-cloud-deployer>2.7.4</spring-cloud-deployer>
|
||||
<spring-cloud-deployer>2.8.0-SNAPSHOT</spring-cloud-deployer>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
|
||||
@@ -40,6 +40,7 @@ import org.springframework.batch.item.ExecutionContext;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader;
|
||||
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
|
||||
import org.springframework.cloud.task.batch.partition.DeployerPartitionHandler;
|
||||
@@ -53,6 +54,7 @@ import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
/**
|
||||
* @author Michael Minella
|
||||
@@ -79,12 +81,14 @@ public class JobConfiguration {
|
||||
private Environment environment;
|
||||
|
||||
@Bean
|
||||
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception {
|
||||
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
|
||||
TaskRepository taskRepository, @Autowired(required = false) ThreadPoolTaskExecutor executor) throws Exception {
|
||||
Resource resource = this.resourceLoader
|
||||
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
|
||||
.getResource("maven://io.spring.cloud:partitioned-batch-job:3.0.0-SNAPSHOT");
|
||||
|
||||
DeployerPartitionHandler partitionHandler =
|
||||
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository);
|
||||
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
|
||||
"workerStep", taskRepository, executor);
|
||||
|
||||
List<String> commandLineArgs = new ArrayList<>(3);
|
||||
commandLineArgs.add("--spring.profiles.active=worker");
|
||||
@@ -100,6 +104,19 @@ public class JobConfiguration {
|
||||
return partitionHandler;
|
||||
}
|
||||
|
||||
@ConditionalOnProperty( value="io.spring.asynchronous",
|
||||
havingValue = "true",
|
||||
matchIfMissing = false)
|
||||
@Bean
|
||||
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(2);
|
||||
executor.setThreadNamePrefix("default_task_executor_thread");
|
||||
executor.setWaitForTasksToCompleteOnShutdown(true);
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Partitioner partitioner() {
|
||||
return new Partitioner() {
|
||||
|
||||
Reference in New Issue
Block a user