diff --git a/docs/src/main/asciidoc/batch.adoc b/docs/src/main/asciidoc/batch.adoc index f2b45f27..f9677a92 100644 --- a/docs/src/main/asciidoc/batch.adoc +++ b/docs/src/main/asciidoc/batch.adoc @@ -62,7 +62,7 @@ most cloud infrastructures. The `DeployerPartitionHandler` and Cloud Deployer. To configure the `DeployerStepExecutionHandler`, you must provide a `Resource` -representing the Spring Boot über-jar to be executed, a `TaskLauncher`, and a +representing the Spring Boot über-jar to be executed, a `TaskLauncherHandler`, and a `JobExplorer`. You can configure any environment properties as well as the max number of workers to be executing at once, the interval to poll for the results (defaults to 10 seconds), and a timeout (defaults to -1 or no timeout). The following example shows how @@ -135,6 +135,40 @@ NOTE: You can find a sample remote partition application in the samples module o Spring Cloud Task project, https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/partitioned-batch-job[here]. +=== Asynchronously launch remote batch partitions + +By default batch partitions are launched sequentially. However, in some cases this may affect performance as each launch will block until the resource (For example: provisioning a pod in Kubernetes) is provisioned. +In these cases you can provide a `ThreadPoolTaskExecutor` to the `DeployerPartitionHandler`. This will launch the remote batch partitions based on the configuration of the `ThreadPoolTaskExecutor`. +For example: + +[source,java] +---- + @Bean + public ThreadPoolTaskExecutor threadPoolTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(4); + executor.setThreadNamePrefix("default_task_executor_thread"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.initialize(); + return executor; + } + + @Bean + public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, + TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception { + Resource resource = this.resourceLoader + .getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT"); + + DeployerPartitionHandler partitionHandler = + new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, + "workerStep", taskRepository, executor); + ... + } +---- + +NOTE: We need to close the context since the use of `ThreadPoolTaskExecutor` leaves a thread active thus the app will not terminate. To close the application appropriately, we will need to set `spring.cloud.task.closecontextEnabled` property to `true`. + + === Notes on Developing a Batch-partitioned application for the Kubernetes Platform * When deploying partitioned apps on the Kubernetes platform, you must use the following diff --git a/docs/src/main/asciidoc/features.adoc b/docs/src/main/asciidoc/features.adoc index 0b0b7ce5..0d0d4936 100644 --- a/docs/src/main/asciidoc/features.adoc +++ b/docs/src/main/asciidoc/features.adoc @@ -190,9 +190,7 @@ following property: === External Task Id Spring Cloud Task lets you store an external task ID for each -`TaskExecution`. An example of this would be a task ID provided by -Cloud Foundry when a task is launched on the platform. -In order to configure your Task to use a generated `TaskExecutionId`, add the +`TaskExecution`. In order to configure your Task to use a generated `TaskExecutionId`, add the following property: `spring.cloud.task.external-execution-id=` diff --git a/pom.xml b/pom.xml index 8cd0a1a8..d608feb0 100755 --- a/pom.xml +++ b/pom.xml @@ -102,8 +102,8 @@ 4.0.0-SNAPSHOT - 2.7.4 - 2.7.4 + 2.8.0-SNAPSHOT + 2.8.0-SNAPSHOT ${spring-cloud-stream.version} diff --git a/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java index 64c3de40..c151aced 100644 --- a/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java +++ b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -35,13 +34,10 @@ import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.partition.PartitionHandler; import org.springframework.batch.core.partition.StepExecutionSplitter; -import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.poller.DirectPoller; import org.springframework.batch.poller.Poller; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.deployer.spi.core.AppDefinition; -import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest; import org.springframework.cloud.deployer.spi.task.TaskLauncher; import org.springframework.cloud.task.listener.annotation.BeforeTask; import org.springframework.cloud.task.repository.TaskExecution; @@ -49,9 +45,11 @@ import org.springframework.cloud.task.repository.TaskRepository; import org.springframework.context.EnvironmentAware; import org.springframework.core.env.Environment; import org.springframework.core.io.Resource; +import org.springframework.core.task.SyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; /** *

@@ -145,19 +143,23 @@ public class DeployerPartitionHandler private boolean defaultArgsAsEnvironmentVars = false; + private TaskExecutor taskExecutor; + + @Autowired private TaskRepository taskRepository; /** * Constructor initializing the DeployerPartitionHandler instance. - * @param taskLauncher the launcher used to execute partitioned tasks. - * @param jobExplorer used to acquire the status of the job. - * @param resource the url to the app to be launched. - * @param stepName the name of the step. + * @param taskLauncher The {@link org.springframework.cloud.deployer.spi.task.TaskLauncher} used to execute partitioned tasks. + * @param jobExplorer The {@link JobExplorer} to acquire the status of the job. + * @param resource The {@link Resource} to the app to be launched. + * @param stepName The name of the step. + * @param taskExecutor If task launches should occur asynchronously then provide a {@link ThreadPoolTaskExecutor}. Default is null. */ - - public DeployerPartitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, - Resource resource, String stepName, TaskRepository taskRepository) { + public DeployerPartitionHandler(org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher, JobExplorer jobExplorer, + Resource resource, String stepName, TaskRepository taskRepository, + TaskExecutor taskExecutor) { Assert.notNull(taskLauncher, "A taskLauncher is required"); Assert.notNull(jobExplorer, "A jobExplorer is required"); Assert.notNull(resource, "A resource is required"); @@ -169,6 +171,19 @@ public class DeployerPartitionHandler this.resource = resource; this.stepName = stepName; this.taskRepository = taskRepository; + this.taskExecutor = taskExecutor; + } + + /** + * Constructor initializing the DeployerPartitionHandler instance. + * @param taskLauncher The {@link org.springframework.cloud.deployer.spi.task.TaskLauncher} used to execute partitioned tasks. + * @param jobExplorer The {@link JobExplorer} to acquire the status of the job. + * @param resource The {@link Resource} to the app to be launched. + * @param stepName The name of the step. + */ + public DeployerPartitionHandler(org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher, JobExplorer jobExplorer, + Resource resource, String stepName, TaskRepository taskRepository) { + this(taskLauncher, jobExplorer, resource, stepName, taskRepository, new SyncTaskExecutor()); } /** @@ -235,8 +250,8 @@ public class DeployerPartitionHandler } /** - * Map of deployment properties to be used by the {@link TaskLauncher}. - * @param deploymentProperties properties to be used by the {@link TaskLauncher} + * Map of deployment properties to be used by the {@link org.springframework.cloud.deployer.spi.task.TaskLauncher}. + * @param deploymentProperties properties to be used by the {@link org.springframework.cloud.deployer.spi.task.TaskLauncher} */ public void setDeploymentProperties(Map deploymentProperties) { this.deploymentProperties = deploymentProperties; @@ -293,110 +308,32 @@ public class DeployerPartitionHandler private void launchWorkers(Set candidates, Set executed) { + TaskLauncherHandler taskLauncherHandler = new TaskLauncherHandler(this.commandLineArgsProvider, + this.taskRepository, this.defaultArgsAsEnvironmentVars, + this.stepName, this.taskExecution, this.environmentVariablesProvider, + this.resource, this.deploymentProperties, + this.taskLauncher, + this.applicationName); for (StepExecution execution : candidates) { if (this.currentWorkers < this.maxWorkers || this.maxWorkers < 0) { - launchWorker(execution); + if (this.taskExecutor != null) { + TaskLauncherHandler taskLauncherThread = new TaskLauncherHandler(this.commandLineArgsProvider, + this.taskRepository, this.defaultArgsAsEnvironmentVars, + this.stepName, this.taskExecution, this.environmentVariablesProvider, + this.resource, this.deploymentProperties, + this.taskLauncher, + this.applicationName, execution); + this.taskExecutor.execute(taskLauncherThread); + } + else { + taskLauncherHandler.launchWorker(execution); + } this.currentWorkers++; - executed.add(execution); } } } - private void launchWorker(StepExecution workerStepExecution) { - List arguments = new ArrayList<>(); - - ExecutionContext copyContext = new ExecutionContext( - workerStepExecution.getExecutionContext()); - - arguments.addAll(this.commandLineArgsProvider.getCommandLineArgs(copyContext)); - - TaskExecution partitionTaskExecution = null; - - if (this.taskRepository != null) { - partitionTaskExecution = this.taskRepository.createTaskExecution(); - } - else { - logger.warn( - "TaskRepository was not set so external execution id will not be recorded."); - } - - if (!this.defaultArgsAsEnvironmentVars) { - arguments.add(formatArgument(SPRING_CLOUD_TASK_JOB_EXECUTION_ID, - String.valueOf(workerStepExecution.getJobExecution().getId()))); - arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_EXECUTION_ID, - String.valueOf(workerStepExecution.getId()))); - arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_NAME, this.stepName)); - arguments - .add(formatArgument(SPRING_CLOUD_TASK_NAME, - String.format("%s_%s_%s", this.taskExecution.getTaskName(), - workerStepExecution.getJobExecution().getJobInstance() - .getJobName(), - workerStepExecution.getStepName()))); - arguments.add(formatArgument(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID, - String.valueOf(this.taskExecution.getExecutionId()))); - - if (partitionTaskExecution != null) { - arguments.add(formatArgument(SPRING_CLOUD_TASK_EXECUTION_ID, - String.valueOf(partitionTaskExecution.getExecutionId()))); - } - } - - copyContext = new ExecutionContext(workerStepExecution.getExecutionContext()); - - Map environmentVariables = this.environmentVariablesProvider - .getEnvironmentVariables(copyContext); - - if (this.defaultArgsAsEnvironmentVars) { - environmentVariables.put(SPRING_CLOUD_TASK_JOB_EXECUTION_ID, - String.valueOf(workerStepExecution.getJobExecution().getId())); - environmentVariables.put(SPRING_CLOUD_TASK_STEP_EXECUTION_ID, - String.valueOf(workerStepExecution.getId())); - environmentVariables.put(SPRING_CLOUD_TASK_STEP_NAME, this.stepName); - environmentVariables - .put(SPRING_CLOUD_TASK_NAME, - String.format("%s_%s_%s", this.taskExecution.getTaskName(), - workerStepExecution.getJobExecution().getJobInstance() - .getJobName(), - workerStepExecution.getStepName())); - environmentVariables.put(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID, - String.valueOf(this.taskExecution.getExecutionId())); - environmentVariables.put(SPRING_CLOUD_TASK_EXECUTION_ID, - String.valueOf(partitionTaskExecution.getExecutionId())); - } - - AppDefinition definition = new AppDefinition(resolveApplicationName(), - environmentVariables); - - AppDeploymentRequest request = new AppDeploymentRequest(definition, this.resource, - this.deploymentProperties, arguments); - - if (logger.isDebugEnabled()) { - logger.debug( - "Requesting the launch of the following application: " + request); - } - - String externalExecutionId = this.taskLauncher.launch(request); - - if (this.taskRepository != null) { - this.taskRepository.updateExternalExecutionId( - partitionTaskExecution.getExecutionId(), externalExecutionId); - } - } - - private String resolveApplicationName() { - if (StringUtils.hasText(this.applicationName)) { - return this.applicationName; - } - else { - return this.taskExecution.getTaskName(); - } - } - - private String formatArgument(String key, String value) { - return String.format("--%s=%s", key, value); - } - private Collection pollReplies(final StepExecution masterStepExecution, final Set executed, final Set candidates, final int size) throws Exception { @@ -468,5 +405,4 @@ public class DeployerPartitionHandler } } - } diff --git a/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/TaskLauncherHandler.java b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/TaskLauncherHandler.java new file mode 100644 index 00000000..a2e1afeb --- /dev/null +++ b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/TaskLauncherHandler.java @@ -0,0 +1,235 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.partition; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.cloud.deployer.spi.core.AppDefinition; +import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest; +import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.cloud.task.repository.TaskRepository; +import org.springframework.core.io.Resource; +import org.springframework.util.StringUtils; + +/** + * Supports the launching of partitions. + * + * @author Glenn Renfro + */ +public class TaskLauncherHandler implements Runnable { + + private CommandLineArgsProvider commandLineArgsProvider; + + private TaskRepository taskRepository; + + private boolean defaultArgsAsEnvironmentVars; + + private String stepName; + + private TaskExecution taskExecution; + + private EnvironmentVariablesProvider environmentVariablesProvider; + + private Resource resource; + + private Map deploymentProperties; + + private org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher; + + private String applicationName; + + private StepExecution workerStepExecution; + + private Log logger = LogFactory.getLog(TaskLauncherHandler.class); + + /** + * @param commandLineArgsProvider The {@link CommandLineArgsProvider} that provides command line + * arguments passed to each partition's execution. + * @param taskRepository The {@link TaskRepository} task repository for launching the partition. + * @param defaultArgsAsEnvironmentVars - If set to true, the default args that are used + * internally by Spring Cloud Task and Spring Batch are passed as environment variables instead of command line arguments. + * @param stepName The name of the step. + * @param taskExecution The {@link TaskExecution} to be associated with the partition. + * @param environmentVariablesProvider {@link EnvironmentVariablesProvider} that provides the environmennt variables. + * @param resource The {@link Resource} to be launched. + * @param deploymentProperties The {@link Map} containing the deployment properties for the partition. + * @param taskLauncher {@link org.springframework.cloud.deployer.spi.task.TaskLauncher} that is used to launch the partition. + * @param applicationName The name to be associated with task. + * @param workerStepExecution The {@link StepExecution} for the paritition. + */ + public TaskLauncherHandler(CommandLineArgsProvider commandLineArgsProvider, + TaskRepository taskRepository, boolean defaultArgsAsEnvironmentVars, + String stepName, TaskExecution taskExecution, EnvironmentVariablesProvider + environmentVariablesProvider, Resource resource, Map deploymentProperties, + org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher, + String applicationName, StepExecution workerStepExecution) { + this.commandLineArgsProvider = commandLineArgsProvider; + this.taskRepository = taskRepository; + this.defaultArgsAsEnvironmentVars = defaultArgsAsEnvironmentVars; + this.stepName = stepName; + this.taskExecution = taskExecution; + this.environmentVariablesProvider = environmentVariablesProvider; + this.resource = resource; + this.deploymentProperties = deploymentProperties; + this.taskLauncher = taskLauncher; + this.applicationName = applicationName; + this.workerStepExecution = workerStepExecution; + } + + /** + * @param commandLineArgsProvider The {@link CommandLineArgsProvider} that provides command line + * arguments passed to each partition's execution. + * @param taskRepository The {@link TaskRepository} task repository for launching the partition. + * @param defaultArgsAsEnvironmentVars - If set to true, the default args that are used + * internally by Spring Cloud Task and Spring Batch are passed as environment variables instead of command line arguments. + * @param stepName The name of the step. + * @param taskExecution The {@link TaskExecution} to be associated with the partition. + * @param environmentVariablesProvider {@link EnvironmentVariablesProvider} that provides the environmennt variables. + * @param resource The {@link Resource} to be launched. + * @param deploymentProperties The {@link Map} containing the deployment properties for the partition. + * @param taskLauncher {@link org.springframework.cloud.deployer.spi.task.TaskLauncher} that is used to launch the partition. + * @param applicationName The name to be associated with task. + */ + public TaskLauncherHandler(CommandLineArgsProvider commandLineArgsProvider, + TaskRepository taskRepository, boolean defaultArgsAsEnvironmentVars, + String stepName, TaskExecution taskExecution, EnvironmentVariablesProvider + environmentVariablesProvider, Resource resource, Map deploymentProperties, + org.springframework.cloud.deployer.spi.task.TaskLauncher taskLauncher, + String applicationName) { + this.commandLineArgsProvider = commandLineArgsProvider; + this.taskRepository = taskRepository; + this.defaultArgsAsEnvironmentVars = defaultArgsAsEnvironmentVars; + this.stepName = stepName; + this.taskExecution = taskExecution; + this.environmentVariablesProvider = environmentVariablesProvider; + this.resource = resource; + this.deploymentProperties = deploymentProperties; + this.taskLauncher = taskLauncher; + this.applicationName = applicationName; + } + + @Override + public void run() { + launchWorker(this.workerStepExecution); + } + + + /** + * Launches the partition for the StepExecution. + * @param workerStepExecution The {@link StepExecution} + */ + public void launchWorker(StepExecution workerStepExecution) { + List arguments = new ArrayList<>(); + + ExecutionContext copyContext = new ExecutionContext( + workerStepExecution.getExecutionContext()); + + arguments.addAll(this.commandLineArgsProvider.getCommandLineArgs(copyContext)); + + TaskExecution partitionTaskExecution = null; + + if (this.taskRepository != null) { + partitionTaskExecution = this.taskRepository.createTaskExecution(); + } + else { + logger.warn( + "TaskRepository was not set so external execution id will not be recorded."); + } + + if (!this.defaultArgsAsEnvironmentVars) { + arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID, + String.valueOf(workerStepExecution.getJobExecution().getId()))); + arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID, + String.valueOf(workerStepExecution.getId()))); + arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME, this.stepName)); + arguments + .add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_NAME, + String.format("%s_%s_%s", this.taskExecution.getTaskName(), + workerStepExecution.getJobExecution().getJobInstance() + .getJobName(), + workerStepExecution.getStepName()))); + arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_PARENT_EXECUTION_ID, + String.valueOf(this.taskExecution.getExecutionId()))); + + if (partitionTaskExecution != null) { + arguments.add(formatArgument(DeployerPartitionHandler.SPRING_CLOUD_TASK_EXECUTION_ID, + String.valueOf(partitionTaskExecution.getExecutionId()))); + } + } + + copyContext = new ExecutionContext(workerStepExecution.getExecutionContext()); + + Map environmentVariables = this.environmentVariablesProvider + .getEnvironmentVariables(copyContext); + + if (this.defaultArgsAsEnvironmentVars) { + environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID, + String.valueOf(workerStepExecution.getJobExecution().getId())); + environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID, + String.valueOf(workerStepExecution.getId())); + environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME, this.stepName); + environmentVariables + .put(DeployerPartitionHandler.SPRING_CLOUD_TASK_NAME, + String.format("%s_%s_%s", this.taskExecution.getTaskName(), + workerStepExecution.getJobExecution().getJobInstance() + .getJobName(), + workerStepExecution.getStepName())); + environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_PARENT_EXECUTION_ID, + String.valueOf(this.taskExecution.getExecutionId())); + environmentVariables.put(DeployerPartitionHandler.SPRING_CLOUD_TASK_EXECUTION_ID, + String.valueOf(partitionTaskExecution.getExecutionId())); + } + + AppDefinition definition = new AppDefinition(resolveApplicationName(), + environmentVariables); + + AppDeploymentRequest request = new AppDeploymentRequest(definition, this.resource, + this.deploymentProperties, arguments); + + if (logger.isDebugEnabled()) { + logger.debug( + "Requesting the launch of the following application: " + request); + } + String externalExecutionId = this.taskLauncher.launch(request); + + if (this.taskRepository != null) { + this.taskRepository.updateExternalExecutionId( + partitionTaskExecution.getExecutionId(), externalExecutionId); + } + } + + private String formatArgument(String key, String value) { + return String.format("--%s=%s", key, value); + } + + private String resolveApplicationName() { + if (StringUtils.hasText(this.applicationName)) { + return this.applicationName; + } + else { + return this.taskExecution.getTaskName(); + } + } + +} diff --git a/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandlerTests.java b/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandlerTests.java index 08ca64cc..5e18b9d3 100644 --- a/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandlerTests.java +++ b/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandlerTests.java @@ -51,6 +51,7 @@ import org.springframework.cloud.task.repository.TaskRepository; import org.springframework.core.env.Environment; import org.springframework.core.io.Resource; import org.springframework.mock.env.MockEnvironment; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -315,25 +316,39 @@ public class DeployerPartitionHandlerTests { } @Test - public void testThreePartitions() throws Exception { + public void testThreePartitionsSequential() throws Exception { + testThreePartitions(null); + } + + @Test + public void testThreePartitionsAsynchronous() throws Exception { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(4); + executor.setThreadNamePrefix("default_task_executor_thread"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.initialize(); + testThreePartitions(executor); + } + + private void testThreePartitions(ThreadPoolTaskExecutor threadPoolTaskExecutor) throws Exception { StepExecution masterStepExecution = createMasterStepExecution(); JobExecution jobExecution = masterStepExecution.getJobExecution(); StepExecution workerStepExecutionStart1 = getStepExecutionStart(jobExecution, 4L); StepExecution workerStepExecutionFinish1 = getStepExecutionFinish( - workerStepExecutionStart1, BatchStatus.COMPLETED); + workerStepExecutionStart1, BatchStatus.COMPLETED); StepExecution workerStepExecutionStart2 = getStepExecutionStart(jobExecution, 5L); StepExecution workerStepExecutionFinish2 = getStepExecutionFinish( - workerStepExecutionStart2, BatchStatus.COMPLETED); + workerStepExecutionStart2, BatchStatus.COMPLETED); StepExecution workerStepExecutionStart3 = getStepExecutionStart(jobExecution, 6L); StepExecution workerStepExecutionFinish3 = getStepExecutionFinish( - workerStepExecutionStart3, BatchStatus.COMPLETED); + workerStepExecutionStart3, BatchStatus.COMPLETED); DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, - this.jobExplorer, this.resource, "step1", this.taskRepository); + this.jobExplorer, this.resource, "step1", this.taskRepository, threadPoolTaskExecutor); handler.setEnvironment(this.environment); TaskExecution taskExecution = new TaskExecution(); @@ -348,23 +363,23 @@ public class DeployerPartitionHandlerTests { when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions); when(this.jobExplorer.getStepExecution(1L, 4L)) - .thenReturn(workerStepExecutionFinish1); + .thenReturn(workerStepExecutionFinish1); when(this.jobExplorer.getStepExecution(1L, 5L)) - .thenReturn(workerStepExecutionFinish2); + .thenReturn(workerStepExecutionFinish2); when(this.jobExplorer.getStepExecution(1L, 6L)) - .thenReturn(workerStepExecutionFinish3); + .thenReturn(workerStepExecutionFinish3); handler.afterPropertiesSet(); handler.beforeTask(taskExecution); Collection results = handler.handle(this.splitter, - masterStepExecution); - + masterStepExecution); + Thread.sleep(5000); verify(this.taskLauncher, times(3)) - .launch(this.appDeploymentRequestArgumentCaptor.capture()); + .launch(this.appDeploymentRequestArgumentCaptor.capture()); List allValues = this.appDeploymentRequestArgumentCaptor - .getAllValues(); + .getAllValues(); validateAppDeploymentRequests(allValues, 3); diff --git a/spring-cloud-task-samples/partitioned-batch-job/README.adoc b/spring-cloud-task-samples/partitioned-batch-job/README.adoc index 525bc162..1bd47fdf 100644 --- a/spring-cloud-task-samples/partitioned-batch-job/README.adoc +++ b/spring-cloud-task-samples/partitioned-batch-job/README.adoc @@ -30,4 +30,13 @@ NOTE: Since this example uses the Spring Cloud Deployer Local to launch the part == Dependencies: A datasource (not in memory) must be configured based on normal Spring Boot conventions -(application.properties/application.yml/environment variables/etc). \ No newline at end of file +(application.properties, application.yml, environment variables). + +== Asynchronous remote partition task launch +Currently partitions are launched sequentially. To launch them asynchronously set the following environment variables: + +* `spring.cloud.task.closecontextEnabled=true` +* `io.spring.asynchronous=true` + +NOTE: We need to close the context since the use of ThreadPoolTaskExecutor leaves a thread active thus the app will not terminate. +To close the application appropriately, we will need to set `spring.cloud.task.closecontextEnabled`` to true. diff --git a/spring-cloud-task-samples/partitioned-batch-job/pom.xml b/spring-cloud-task-samples/partitioned-batch-job/pom.xml index 78d16ec5..49e20bf1 100644 --- a/spring-cloud-task-samples/partitioned-batch-job/pom.xml +++ b/spring-cloud-task-samples/partitioned-batch-job/pom.xml @@ -20,7 +20,7 @@ 4.0.0-SNAPSHOT UTF-8 17 - 2.7.4 + 2.8.0-SNAPSHOT diff --git a/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/JobConfiguration.java b/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/JobConfiguration.java index 6fa57225..496b8a67 100644 --- a/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/JobConfiguration.java +++ b/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/JobConfiguration.java @@ -40,6 +40,7 @@ import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader; import org.springframework.cloud.deployer.spi.task.TaskLauncher; import org.springframework.cloud.task.batch.partition.DeployerPartitionHandler; @@ -53,6 +54,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.core.env.Environment; import org.springframework.core.io.Resource; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * @author Michael Minella @@ -79,12 +81,14 @@ public class JobConfiguration { private Environment environment; @Bean - public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception { + public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, + TaskRepository taskRepository, @Autowired(required = false) ThreadPoolTaskExecutor executor) throws Exception { Resource resource = this.resourceLoader - .getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT"); + .getResource("maven://io.spring.cloud:partitioned-batch-job:3.0.0-SNAPSHOT"); DeployerPartitionHandler partitionHandler = - new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository); + new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, + "workerStep", taskRepository, executor); List commandLineArgs = new ArrayList<>(3); commandLineArgs.add("--spring.profiles.active=worker"); @@ -100,6 +104,19 @@ public class JobConfiguration { return partitionHandler; } + @ConditionalOnProperty( value="io.spring.asynchronous", + havingValue = "true", + matchIfMissing = false) + @Bean + public ThreadPoolTaskExecutor threadPoolTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(2); + executor.setThreadNamePrefix("default_task_executor_thread"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.initialize(); + return executor; + } + @Bean public Partitioner partitioner() { return new Partitioner() {