diff --git a/spring-cloud-task-batch/pom.xml b/spring-cloud-task-batch/pom.xml index 6a361f8b..cb4b6c4e 100644 --- a/spring-cloud-task-batch/pom.xml +++ b/spring-cloud-task-batch/pom.xml @@ -6,8 +6,8 @@ 4.0.0 - spring-cloud-task-parent org.springframework.cloud + spring-cloud-task-parent 1.0.0.BUILD-SNAPSHOT @@ -21,11 +21,37 @@ org.springframework.cloud spring-cloud-task-core + + org.springframework.cloud + spring-cloud-deployer-spi + 1.0.0.BUILD-SNAPSHOT + true + + + org.springframework.cloud + spring-cloud-deployer-local + 1.0.0.BUILD-SNAPSHOT + test + + + org.springframework.batch + spring-batch-integration + true + + + org.springframework + spring-test + junit junit test + + org.mockito + mockito-core + test + com.h2database h2 diff --git a/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/configuration/TaskBatchExecutionListenerBeanPostProcessor.java b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/configuration/TaskBatchExecutionListenerBeanPostProcessor.java index b4f474cd..06ee08a6 100644 --- a/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/configuration/TaskBatchExecutionListenerBeanPostProcessor.java +++ b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/configuration/TaskBatchExecutionListenerBeanPostProcessor.java @@ -15,12 +15,16 @@ */ package org.springframework.cloud.task.batch.configuration; +import java.util.ArrayList; +import java.util.List; + import org.springframework.batch.core.job.AbstractJob; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.cloud.task.batch.listener.TaskBatchExecutionListener; import org.springframework.context.ApplicationContext; +import org.springframework.util.Assert; /** * Injects a configured {@link TaskBatchExecutionListener} into any batch jobs (beans @@ -35,10 +39,18 @@ public class TaskBatchExecutionListenerBeanPostProcessor implements BeanPostProc @Autowired private ApplicationContext applicationContext; + private List jobNames = new ArrayList<>(); + @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + if(jobNames.size() > 0) { + if(!jobNames.contains(beanName)) { + return bean; + } + } + int length = this.applicationContext .getBeanNamesForType(TaskBatchExecutionListener.class).length; @@ -61,4 +73,10 @@ public class TaskBatchExecutionListenerBeanPostProcessor implements BeanPostProc throws BeansException { return bean; } + + public void setJobNames(List jobNames) { + Assert.notNull(jobNames, "A list is required"); + + this.jobNames = jobNames; + } } diff --git a/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java new file mode 100644 index 00000000..9caf64c5 --- /dev/null +++ b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java @@ -0,0 +1,332 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.task.batch.partition; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.partition.PartitionHandler; +import org.springframework.batch.core.partition.StepExecutionSplitter; +import org.springframework.batch.poller.DirectPoller; +import org.springframework.batch.poller.Poller; +import org.springframework.cloud.deployer.spi.core.AppDefinition; +import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest; +import org.springframework.cloud.deployer.spi.task.TaskLauncher; +import org.springframework.cloud.task.listener.annotation.BeforeTask; +import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.context.EnvironmentAware; +import org.springframework.core.env.AbstractEnvironment; +import org.springframework.core.env.Environment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.core.env.PropertySource; +import org.springframework.core.io.Resource; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; + +/** + *

A {@link PartitionHandler} implementation that delegates to a {@link TaskLauncher} for + * each of the workers. The id of the worker's StepExecution is passed as an environment + * variable to the worker. The worker, bootstrapped by the + * {@link DeployerStepExecutionHandler}, looks up the StepExecution in the JobRepository + * and executes it. This PartitionHandler polls the JobRepository for the results.

+ * + *

If the job fails, the partitions will be re-executed per normal batch rules (steps that + * are complete should do nothing, failed steps should restart based on their + * configurations).

+ * + *

This PartitionHandler and all of the worker processes must share the same JobRepository + * data store (aka point the same database).

+ * + * @author Michael Minella + */ +public class DeployerPartitionHandler implements PartitionHandler, EnvironmentAware { + + public static final String SPRING_CLOUD_TASK_JOB_EXECUTION_ID = + "spring.cloud.task.job-execution-id"; + + public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_ID = + "spring.cloud.task.step-execution-id"; + + public static final String SPRING_CLOUD_TASK_STEP_NAME = + "spring.cloud.task.step-name"; + + private int maxWorkers = -1; + + private int gridSize = 1; + + private int currentWorkers = 0; + + private TaskLauncher taskLauncher; + + private JobExplorer jobExplorer; + + private TaskExecution taskExecution; + + private Resource resource; + + private Map environmentProperties = new HashMap<>(); + + private String stepName; + + private Log logger = LogFactory.getLog(DeployerPartitionHandler.class); + + private long pollInterval = 10000; + + private long timeout = -1; + + private Environment environment; + + public DeployerPartitionHandler(TaskLauncher taskLauncher, + JobExplorer jobExplorer, + Resource resource, + String stepName) { + Assert.notNull(taskLauncher, "A taskLauncher is required"); + Assert.notNull(jobExplorer, "A jobExplorer is required"); + Assert.notNull(resource, "A resource is required"); + Assert.hasText(stepName, "A step name is required"); + + this.taskLauncher = taskLauncher; + this.jobExplorer = jobExplorer; + this.resource = resource; + this.stepName = stepName; + } + + /** + * The maximum number of workers to be executing at once. + * + * @param maxWorkers number of workers. Defaults to -1 (unlimited) + */ + public void setMaxWorkers(int maxWorkers) { + Assert.isTrue(maxWorkers != 0, "maxWorkers cannot be 0"); + this.maxWorkers = maxWorkers; + } + + /** + * Approximate size of the pool of worker JVMs available. May be used by the + * {@link StepExecutionSplitter} to determine how many partitions to create (at the + * discretion of the {@link org.springframework.batch.core.partition.support.Partitioner}). + * + * @param gridSize size of grid. Defaults to 1 + */ + public void setGridSize(int gridSize) { + this.gridSize = gridSize; + } + + /** + * System properties to be made available for all workers. + * + * @param environmentProperties Map of properties + */ + public void setEnvironmentProperties(Map environmentProperties) { + this.environmentProperties = environmentProperties; + } + + /** + * The interval to check the job repository for completed steps. + * + * @param pollInterval interval. Defaults to 10 seconds + */ + public void setPollInterval(long pollInterval) { + this.pollInterval = pollInterval; + } + + /** + * Timeout for the master step. This is a timeout for all workers to complete. + * + * @param timeout timeout. Defaults to none (-1). + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + @BeforeTask + public void beforeTask(TaskExecution taskExecution) { + this.taskExecution = taskExecution; + } + + @Override + public Collection handle(StepExecutionSplitter stepSplitter, + StepExecution stepExecution) throws Exception { + + final Set tempCandidates = + stepSplitter.split(stepExecution, this.gridSize); + + // Following two lines due to https://jira.spring.io/browse/BATCH-2490 + final Set candidates = new HashSet<>(tempCandidates.size()); + candidates.addAll(tempCandidates); + + int partitions = candidates.size(); + + logger.debug(String.format("%s partitions were returned", partitions)); + + final Set executed = new HashSet<>(candidates.size()); + + if(CollectionUtils.isEmpty(candidates)) { + return null; + } + + launchWorkers(candidates, executed); + + candidates.removeAll(executed); + + return pollReplies(stepExecution, executed, candidates, partitions); + } + + private void launchWorkers(Set candidates, Set executed) { + for (StepExecution execution : candidates) { + if(this.currentWorkers < this.maxWorkers || this.maxWorkers < 0) { + launchWorker(execution); + this.currentWorkers++; + + executed.add(execution); + } + } + } + + private void launchWorker(StepExecution workerStepExecution) { + //TODO: Refactor these to be passed as command line args once SCD-20 is complete + // https://github.com/spring-cloud/spring-cloud-deployer/issues/20 + Map parameters = getParameters(this.taskExecution.getParameters()); + parameters.put(SPRING_CLOUD_TASK_JOB_EXECUTION_ID, + String.valueOf(workerStepExecution.getJobExecution().getId())); + parameters.put(SPRING_CLOUD_TASK_STEP_EXECUTION_ID, + String.valueOf(workerStepExecution.getId())); + parameters.put(SPRING_CLOUD_TASK_STEP_NAME, this.stepName); + + AppDefinition definition = + new AppDefinition(String.format("%s:%s:%s", + taskExecution.getTaskName(), + workerStepExecution.getJobExecution().getJobInstance().getJobName(), + workerStepExecution.getStepName()), + parameters); + + Map environmentProperties = new HashMap<>(this.environmentProperties.size()); + environmentProperties.putAll(getCurrentEnvironmentProperties()); + environmentProperties.putAll(this.environmentProperties); + + AppDeploymentRequest request = + new AppDeploymentRequest(definition, this.resource, environmentProperties); + + taskLauncher.launch(request); + } + + private Collection pollReplies(final StepExecution masterStepExecution, + final Set executed, + final Set candidates, + final int size) throws Exception { + + final Collection result = new ArrayList<>(executed.size()); + + Callable> callback = new Callable>() { + @Override + public Collection call() throws Exception { + Set newExecuted = new HashSet<>(); + + for (StepExecution curStepExecution : executed) { + if (!result.contains(curStepExecution)) { + StepExecution partitionStepExecution = + jobExplorer.getStepExecution(masterStepExecution.getJobExecutionId(), curStepExecution.getId()); + + if (isComplete(partitionStepExecution.getStatus())) { + result.add(partitionStepExecution); + currentWorkers--; + + if (!candidates.isEmpty()) { + + launchWorkers(candidates, newExecuted); + candidates.removeAll(newExecuted); + } + } + } + } + + executed.addAll(newExecuted); + + if(result.size() == size) { + return result; + } + else { + return null; + } + } + }; + + Poller> poller = new DirectPoller<>(this.pollInterval); + Future> resultsFuture = poller.poll(callback); + + if(timeout >= 0) { + return resultsFuture.get(timeout, TimeUnit.MILLISECONDS); + } + else { + return resultsFuture.get(); + } + } + + private boolean isComplete(BatchStatus status) { + return status.equals(BatchStatus.COMPLETED) || status.isGreaterThan(BatchStatus.STARTED); + } + + private Map getParameters(List parameters) { + Map parameterMap = new HashMap<>(parameters.size()); + + for (String parameter : parameters) { + String[] pieces = parameter.split("="); + parameterMap.put(pieces[0], pieces[1]); + } + + return parameterMap; + } + + @Override + public void setEnvironment(Environment environment) { + this.environment = environment; + } + + private Map getCurrentEnvironmentProperties() { + Map currentEnvironment = new HashMap<>(); + + Set keys = new HashSet<>(); + + for(Iterator it = ((AbstractEnvironment) this.environment).getPropertySources().iterator(); it.hasNext(); ) { + PropertySource propertySource = (PropertySource) it.next(); + if (propertySource instanceof MapPropertySource) { + keys.addAll(Arrays.asList(((MapPropertySource) propertySource).getPropertyNames())); + } + } + + for (String key : keys) { + currentEnvironment.put(key, this.environment.getProperty(key)); + } + + return currentEnvironment; + } +} diff --git a/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerStepExecutionHandler.java b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerStepExecutionHandler.java new file mode 100644 index 00000000..20f652c5 --- /dev/null +++ b/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerStepExecutionHandler.java @@ -0,0 +1,119 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.task.batch.partition; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.JobInterruptedException; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.NoSuchStepException; +import org.springframework.batch.core.step.StepLocator; +import org.springframework.batch.integration.partition.BeanFactoryStepLocator; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.core.env.Environment; +import org.springframework.util.Assert; + +/** + *

A {@link CommandLineRunner} used to execute a {@link Step}. No result is provided + * directly to the associated {@link DeployerPartitionHandler} as it will obtain the step + * results directly from the shared job repository.

+ * + *

The {@link StepExecution} is rehydrated based on the environment variables provided. + * Specifically, the following variables are required:

+ *
    + *
  • {@link DeployerPartitionHandler#SPRING_CLOUD_TASK_JOB_EXECUTION_ID}: The id of + * the JobExecution.
  • + *
  • {@link DeployerPartitionHandler#SPRING_CLOUD_TASK_STEP_EXECUTION_ID}: The id of + * the StepExecution.
  • + *
  • {@link DeployerPartitionHandler#SPRING_CLOUD_TASK_STEP_NAME}: The id of the + * bean definition for the Step to execute. The id must be found within the provided + * {@link BeanFactory}
  • + *
+ * + * @author Michael Minella + */ +public class DeployerStepExecutionHandler implements CommandLineRunner { + + private JobExplorer jobExplorer; + + private JobRepository jobRepository; + + private Log logger = LogFactory.getLog(DeployerStepExecutionHandler.class); + + @Autowired + private Environment environment; + + private StepLocator stepLocator; + + public DeployerStepExecutionHandler(BeanFactory beanFactory, JobExplorer jobExplorer, JobRepository jobRepository) { + Assert.notNull(beanFactory, "A beanFactory is required"); + Assert.notNull(jobExplorer, "A jobExplorer is required"); + Assert.notNull(jobRepository, "A jobRepository is required"); + + this.stepLocator = new BeanFactoryStepLocator(); + ((BeanFactoryStepLocator) this.stepLocator).setBeanFactory(beanFactory); + + this.jobExplorer = jobExplorer; + this.jobRepository = jobRepository; + } + + @Override + public void run(String... args) throws Exception { + + validateRequest(); + + Long jobExecutionId = Long.parseLong(environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)); + Long stepExecutionId = Long.parseLong(environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)); + StepExecution stepExecution = jobExplorer.getStepExecution(jobExecutionId, stepExecutionId); + + if (stepExecution == null) { + throw new NoSuchStepException(String.format("No StepExecution could be located for step execution id %s within job execution %s", stepExecutionId, jobExecutionId)); + } + + String stepName = environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME); + Step step = stepLocator.getStep(stepName); + + try { + logger.debug(String.format("Executing step %s with step execution id %s and job execution id %s", stepExecution.getStepName(), stepExecutionId, jobExecutionId)); + + step.execute(stepExecution); + } + catch (JobInterruptedException e) { + stepExecution.setStatus(BatchStatus.STOPPED); + jobRepository.update(stepExecution); + } + catch (Throwable e) { + stepExecution.addFailureException(e); + stepExecution.setStatus(BatchStatus.FAILED); + jobRepository.update(stepExecution); + } + } + + private void validateRequest() { + Assert.isTrue(environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID), "A job execution id is required"); + Assert.isTrue(environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID), "A step execution id is required"); + Assert.isTrue(environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME), "A step name is required"); + + Assert.isTrue(this.stepLocator.getStepNames().contains(environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)), "The step requested cannot be found in the provided BeanFactory"); + } +} diff --git a/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandlerTests.java b/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandlerTests.java new file mode 100644 index 00000000..81f38dd1 --- /dev/null +++ b/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandlerTests.java @@ -0,0 +1,609 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.task.batch.partition; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.partition.StepExecutionSplitter; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.cloud.deployer.spi.core.AppDefinition; +import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest; +import org.springframework.cloud.deployer.spi.task.TaskLauncher; +import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.core.env.Environment; +import org.springframework.core.io.Resource; +import org.springframework.mock.env.MockEnvironment; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Michael Minella + */ +public class DeployerPartitionHandlerTests { + + @Mock + private TaskLauncher taskLauncher; + + @Mock + private JobExplorer jobExplorer; + + @Mock + private Resource resource; + + @Mock + private StepExecutionSplitter splitter; + + @Mock + private JobRepository jobRepository; + + private Environment environment; + + @Captor ArgumentCaptor appDeploymentRequestArgumentCaptor; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + this.environment = new MockEnvironment(); + } + + @Test + public void testConstructorValidation() { + validateConstructorValidation(null, null, null, null, "A taskLauncher is required"); + validateConstructorValidation(this.taskLauncher, null, null, null, "A jobExplorer is required"); + validateConstructorValidation(this.taskLauncher, this.jobExplorer, null, null, "A resource is required"); + validateConstructorValidation(this.taskLauncher, this.jobExplorer, this.resource, null, "A step name is required"); + + new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step-name"); + } + + @Test + public void testNoPartitions() throws Exception { + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + + StepExecution stepExecution = new StepExecution("step1", new JobExecution(1L)); + + when(this.splitter.split(stepExecution, 1)).thenReturn(new HashSet()); + + Collection results = handler.handle(this.splitter, stepExecution); + + verify(this.taskLauncher, never()).launch((AppDeploymentRequest) any()); + assertNull(results); + } + + @Test + public void testSinglePartition() throws Exception { + + StepExecution masterStepExecution = createMasterStepExecution(); + JobExecution jobExecution = masterStepExecution.getJobExecution(); + + StepExecution workerStepExecutionStart = getStepExecutionStart(jobExecution, 4L); + StepExecution workerStepExecutionFinish = getStepExecutionFinish(workerStepExecutionStart, BatchStatus.COMPLETED); + + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setTaskName("partitionedJobTask"); + + Set stepExecutions = new HashSet<>(); + stepExecutions.add(workerStepExecutionStart); + when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions); + + when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish); + + handler.beforeTask(taskExecution); + Collection results = handler.handle(this.splitter, masterStepExecution); + + verify(this.taskLauncher).launch(this.appDeploymentRequestArgumentCaptor.capture()); + + AppDeploymentRequest request = this.appDeploymentRequestArgumentCaptor.getValue(); + + assertEquals(this.resource, request.getResource()); + assertEquals(0, request.getEnvironmentProperties().size()); + + AppDefinition appDefinition = request.getDefinition(); + + assertEquals("partitionedJobTask:partitionedJob:step1:partition1", appDefinition.getName()); + assertEquals("1", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)); + assertEquals("4", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)); + assertEquals("step1", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)); + + assertEquals(1, results.size()); + StepExecution resultStepExecution = results.iterator().next(); + assertEquals(BatchStatus.COMPLETED, resultStepExecution.getStatus()); + assertEquals("step1:partition1", resultStepExecution.getStepName()); + } + + @Test + public void testThreePartitions() throws Exception { + + StepExecution masterStepExecution = createMasterStepExecution(); + JobExecution jobExecution = masterStepExecution.getJobExecution(); + + StepExecution workerStepExecutionStart1 = getStepExecutionStart(jobExecution, 4L); + StepExecution workerStepExecutionFinish1 = getStepExecutionFinish(workerStepExecutionStart1, BatchStatus.COMPLETED); + + StepExecution workerStepExecutionStart2 = getStepExecutionStart(jobExecution, 5L); + StepExecution workerStepExecutionFinish2 = getStepExecutionFinish(workerStepExecutionStart2, BatchStatus.COMPLETED); + + StepExecution workerStepExecutionStart3 = getStepExecutionStart(jobExecution, 6L); + StepExecution workerStepExecutionFinish3 = getStepExecutionFinish(workerStepExecutionStart3, BatchStatus.COMPLETED); + + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setTaskName("partitionedJobTask"); + + Set stepExecutions = new HashSet<>(); + + stepExecutions.add(workerStepExecutionStart1); + stepExecutions.add(workerStepExecutionStart2); + stepExecutions.add(workerStepExecutionStart3); + + when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions); + + when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish1); + when(this.jobExplorer.getStepExecution(1L, 5L)).thenReturn(workerStepExecutionFinish2); + when(this.jobExplorer.getStepExecution(1L, 6L)).thenReturn(workerStepExecutionFinish3); + + handler.beforeTask(taskExecution); + Collection results = handler.handle(this.splitter, masterStepExecution); + + verify(this.taskLauncher, times(3)).launch(this.appDeploymentRequestArgumentCaptor.capture()); + + List allValues = this.appDeploymentRequestArgumentCaptor.getAllValues(); + + validateAppDeploymentRequests(allValues, 3); + + validateStepExecutionResults(results); + } + + @Test + public void testThreePartitionsTwoWorkers() throws Exception { + + StepExecution masterStepExecution = createMasterStepExecution(); + JobExecution jobExecution = masterStepExecution.getJobExecution(); + + StepExecution workerStepExecutionStart1 = getStepExecutionStart(jobExecution, 4L); + StepExecution workerStepExecutionFinish1 = getStepExecutionFinish(workerStepExecutionStart1, BatchStatus.COMPLETED); + + StepExecution workerStepExecutionStart2 = getStepExecutionStart(jobExecution, 5L); + StepExecution workerStepExecutionFinish2 = getStepExecutionFinish(workerStepExecutionStart2, BatchStatus.COMPLETED); + + StepExecution workerStepExecutionStart3 = getStepExecutionStart(jobExecution, 6L); + StepExecution workerStepExecutionFinish3 = getStepExecutionFinish(workerStepExecutionStart3, BatchStatus.COMPLETED); + + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + handler.setMaxWorkers(2); + + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setTaskName("partitionedJobTask"); + + Set stepExecutions = new HashSet<>(); + + stepExecutions.add(workerStepExecutionStart1); + stepExecutions.add(workerStepExecutionStart2); + stepExecutions.add(workerStepExecutionStart3); + + when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions); + + when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish1); + when(this.jobExplorer.getStepExecution(1L, 5L)).thenReturn(workerStepExecutionFinish2); + when(this.jobExplorer.getStepExecution(1L, 6L)).thenReturn(workerStepExecutionFinish3); + + handler.beforeTask(taskExecution); + Collection results = handler.handle(this.splitter, masterStepExecution); + + verify(this.taskLauncher, times(3)).launch(this.appDeploymentRequestArgumentCaptor.capture()); + + List allValues = this.appDeploymentRequestArgumentCaptor.getAllValues(); + + validateAppDeploymentRequests(allValues, 3); + + validateStepExecutionResults(results); + } + + @Test + public void testFailedWorker() throws Exception { + + StepExecution masterStepExecution = createMasterStepExecution(); + JobExecution jobExecution = masterStepExecution.getJobExecution(); + + StepExecution workerStepExecutionStart1 = getStepExecutionStart(jobExecution, 4L); + StepExecution workerStepExecutionFinish1 = getStepExecutionFinish(workerStepExecutionStart1, BatchStatus.COMPLETED); + + StepExecution workerStepExecutionStart2 = getStepExecutionStart(jobExecution, 5L); + StepExecution workerStepExecutionFinish2 = getStepExecutionFinish(workerStepExecutionStart2, BatchStatus.FAILED); + + StepExecution workerStepExecutionStart3 = getStepExecutionStart(jobExecution, 6L); + StepExecution workerStepExecutionFinish3 = getStepExecutionFinish(workerStepExecutionStart3, BatchStatus.COMPLETED); + + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + handler.setMaxWorkers(2); + + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setTaskName("partitionedJobTask"); + + Set stepExecutions = new HashSet<>(); + + stepExecutions.add(workerStepExecutionStart1); + stepExecutions.add(workerStepExecutionStart2); + stepExecutions.add(workerStepExecutionStart3); + + when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions); + + when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish1); + when(this.jobExplorer.getStepExecution(1L, 5L)).thenReturn(workerStepExecutionFinish2); + when(this.jobExplorer.getStepExecution(1L, 6L)).thenReturn(workerStepExecutionFinish3); + + handler.beforeTask(taskExecution); + Collection results = handler.handle(this.splitter, masterStepExecution); + + verify(this.taskLauncher, times(3)).launch(this.appDeploymentRequestArgumentCaptor.capture()); + + List allValues = this.appDeploymentRequestArgumentCaptor.getAllValues(); + + validateAppDeploymentRequests(allValues, 3); + + Iterator resultsIterator = results.iterator(); + Set names = new HashSet<>(results.size()); + + while (resultsIterator.hasNext()) { + StepExecution curResult = resultsIterator.next(); + + if(curResult.getStepName().equals("step1:partition2")) { + assertEquals(BatchStatus.FAILED, curResult.getStatus()); + } + else { + assertEquals(BatchStatus.COMPLETED, curResult.getStatus()); + } + + assertTrue(!names.contains(curResult.getStepName())); + names.add(curResult.getStepName()); + } + } + + @Test + public void testPassingEnvironmentProperties() throws Exception { + + StepExecution masterStepExecution = createMasterStepExecution(); + JobExecution jobExecution = masterStepExecution.getJobExecution(); + + StepExecution workerStepExecutionStart = getStepExecutionStart(jobExecution, 4L); + StepExecution workerStepExecutionFinish = getStepExecutionFinish(workerStepExecutionStart, BatchStatus.COMPLETED); + + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + + Map environmentParameters = new HashMap<>(2); + environmentParameters.put("foo", "bar"); + environmentParameters.put("baz", "qux"); + + handler.setEnvironmentProperties(environmentParameters); + + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setTaskName("partitionedJobTask"); + + Set stepExecutions = new HashSet<>(); + stepExecutions.add(workerStepExecutionStart); + when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions); + + when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish); + + handler.beforeTask(taskExecution); + Collection results = handler.handle(this.splitter, masterStepExecution); + + verify(this.taskLauncher).launch(this.appDeploymentRequestArgumentCaptor.capture()); + + AppDeploymentRequest request = this.appDeploymentRequestArgumentCaptor.getValue(); + + assertEquals(this.resource, request.getResource()); + assertEquals(2, request.getEnvironmentProperties().size()); + assertEquals("bar", request.getEnvironmentProperties().get("foo")); + assertEquals("qux", request.getEnvironmentProperties().get("baz")); + + AppDefinition appDefinition = request.getDefinition(); + + assertEquals("partitionedJobTask:partitionedJob:step1:partition1", appDefinition.getName()); + assertEquals("1", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)); + assertEquals("4", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)); + assertEquals("step1", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)); + + assertEquals(1, results.size()); + StepExecution resultStepExecution = results.iterator().next(); + assertEquals(BatchStatus.COMPLETED, resultStepExecution.getStatus()); + assertEquals("step1:partition1", resultStepExecution.getStepName()); + } + + @Test + public void testOverridingEnvironmentProperties() throws Exception { + + ((MockEnvironment) this.environment).setProperty("foo", "zoo"); + ((MockEnvironment) this.environment).setProperty("task", "batch"); + + StepExecution masterStepExecution = createMasterStepExecution(); + JobExecution jobExecution = masterStepExecution.getJobExecution(); + + StepExecution workerStepExecutionStart = getStepExecutionStart(jobExecution, 4L); + StepExecution workerStepExecutionFinish = getStepExecutionFinish(workerStepExecutionStart, BatchStatus.COMPLETED); + + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + + Map environmentParameters = new HashMap<>(2); + environmentParameters.put("foo", "bar"); + environmentParameters.put("baz", "qux"); + + handler.setEnvironmentProperties(environmentParameters); + + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setTaskName("partitionedJobTask"); + + Set stepExecutions = new HashSet<>(); + stepExecutions.add(workerStepExecutionStart); + when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions); + + when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish); + + handler.beforeTask(taskExecution); + Collection results = handler.handle(this.splitter, masterStepExecution); + + verify(this.taskLauncher).launch(this.appDeploymentRequestArgumentCaptor.capture()); + + AppDeploymentRequest request = this.appDeploymentRequestArgumentCaptor.getValue(); + + assertEquals(this.resource, request.getResource()); + assertEquals(3, request.getEnvironmentProperties().size()); + assertEquals("bar", request.getEnvironmentProperties().get("foo")); + assertEquals("qux", request.getEnvironmentProperties().get("baz")); + assertEquals("batch", request.getEnvironmentProperties().get("task")); + + AppDefinition appDefinition = request.getDefinition(); + + assertEquals("partitionedJobTask:partitionedJob:step1:partition1", appDefinition.getName()); + assertEquals("1", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)); + assertEquals("4", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)); + assertEquals("step1", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)); + + assertEquals(1, results.size()); + StepExecution resultStepExecution = results.iterator().next(); + assertEquals(BatchStatus.COMPLETED, resultStepExecution.getStatus()); + assertEquals("step1:partition1", resultStepExecution.getStepName()); + } + + @Test + public void testPollInterval() throws Exception { + + StepExecution masterStepExecution = createMasterStepExecution(); + JobExecution jobExecution = masterStepExecution.getJobExecution(); + + StepExecution workerStepExecutionStart1 = getStepExecutionStart(jobExecution, 4L); + StepExecution workerStepExecutionFinish1 = getStepExecutionFinish(workerStepExecutionStart1, BatchStatus.COMPLETED); + + StepExecution workerStepExecutionStart2 = getStepExecutionStart(jobExecution, 5L); + StepExecution workerStepExecutionFinish2 = getStepExecutionFinish(workerStepExecutionStart2, BatchStatus.COMPLETED); + + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + + handler.setPollInterval(20000L); + handler.setMaxWorkers(1); + + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setTaskName("partitionedJobTask"); + + Set stepExecutions = new HashSet<>(); + stepExecutions.add(workerStepExecutionStart1); + stepExecutions.add(workerStepExecutionStart2); + when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions); + + when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish1); + when(this.jobExplorer.getStepExecution(1L, 5L)).thenReturn(workerStepExecutionFinish2); + + handler.beforeTask(taskExecution); + + Date startTime = new Date(); + Collection results = handler.handle(this.splitter, masterStepExecution); + Date endTime = new Date(); + + verify(this.taskLauncher, times(2)).launch(this.appDeploymentRequestArgumentCaptor.capture()); + + List allRequests = this.appDeploymentRequestArgumentCaptor.getAllValues(); + + validateAppDeploymentRequests(allRequests, 2); + + validateStepExecutionResults(results); + + assertTrue(endTime.getTime() - startTime.getTime() > 20000); + } + + @Test(expected = TimeoutException.class) + public void testTimeout() throws Exception { + + StepExecution masterStepExecution = createMasterStepExecution(); + JobExecution jobExecution = masterStepExecution.getJobExecution(); + + StepExecution workerStepExecutionStart1 = getStepExecutionStart(jobExecution, 4L); + StepExecution workerStepExecutionFinish1 = getStepExecutionFinish(workerStepExecutionStart1, BatchStatus.COMPLETED); + + StepExecution workerStepExecutionStart2 = getStepExecutionStart(jobExecution, 5L); + StepExecution workerStepExecutionFinish2 = getStepExecutionFinish(workerStepExecutionStart2, BatchStatus.COMPLETED); + + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + + handler.setPollInterval(20000L); + handler.setMaxWorkers(1); + handler.setTimeout(1000L); + + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setTaskName("partitionedJobTask"); + + Set stepExecutions = new HashSet<>(); + stepExecutions.add(workerStepExecutionStart1); + stepExecutions.add(workerStepExecutionStart2); + when(this.splitter.split(masterStepExecution, 1)).thenReturn(stepExecutions); + + when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish1); + when(this.jobExplorer.getStepExecution(1L, 5L)).thenReturn(workerStepExecutionFinish2); + + handler.beforeTask(taskExecution); + + handler.handle(this.splitter, masterStepExecution); + } + + @Test + public void testGridSize() throws Exception { + + StepExecution masterStepExecution = createMasterStepExecution(); + JobExecution jobExecution = masterStepExecution.getJobExecution(); + + StepExecution workerStepExecutionStart1 = getStepExecutionStart(jobExecution, 4L); + StepExecution workerStepExecutionFinish1 = getStepExecutionFinish(workerStepExecutionStart1, BatchStatus.COMPLETED); + + StepExecution workerStepExecutionStart2 = getStepExecutionStart(jobExecution, 5L); + StepExecution workerStepExecutionFinish2 = getStepExecutionFinish(workerStepExecutionStart2, BatchStatus.COMPLETED); + + DeployerPartitionHandler handler = new DeployerPartitionHandler(this.taskLauncher, this.jobExplorer, this.resource, "step1"); + handler.setEnvironment(this.environment); + + handler.setGridSize(2); + + TaskExecution taskExecution = new TaskExecution(); + taskExecution.setTaskName("partitionedJobTask"); + + Set stepExecutions = new HashSet<>(); + stepExecutions.add(workerStepExecutionStart1); + stepExecutions.add(workerStepExecutionStart2); + when(this.splitter.split(masterStepExecution, 2)).thenReturn(stepExecutions); + + when(this.jobExplorer.getStepExecution(1L, 4L)).thenReturn(workerStepExecutionFinish1); + when(this.jobExplorer.getStepExecution(1L, 5L)).thenReturn(workerStepExecutionFinish2); + + handler.beforeTask(taskExecution); + + Collection results = handler.handle(this.splitter, masterStepExecution); + + verify(this.taskLauncher, times(2)).launch(this.appDeploymentRequestArgumentCaptor.capture()); + + List allRequests = this.appDeploymentRequestArgumentCaptor.getAllValues(); + + validateAppDeploymentRequests(allRequests, 2); + + validateStepExecutionResults(results); + } + + private StepExecution getStepExecutionFinish(StepExecution stepExecutionStart, BatchStatus status) { + StepExecution workerStepExecutionFinish = new StepExecution(stepExecutionStart.getStepName(), stepExecutionStart.getJobExecution()); + workerStepExecutionFinish.setId(stepExecutionStart.getId()); + workerStepExecutionFinish.setStatus(status); + return workerStepExecutionFinish; + } + + private StepExecution getStepExecutionStart(JobExecution jobExecution, long id) { + StepExecution workerStepExecutionStart = new StepExecution("step1:partition" + (id - 3), jobExecution); + workerStepExecutionStart.setId(id); + return workerStepExecutionStart; + } + + private StepExecution createMasterStepExecution() { + + JobExecution jobExecution = new JobExecution(1L); + jobExecution.setJobInstance(new JobInstance(2L, "partitionedJob")); + + StepExecution masterStepExecution = new StepExecution("masterStep", jobExecution); + masterStepExecution.setId(3L); + + return masterStepExecution; + } + + private void validateStepExecutionResults(Collection results) { + Iterator resultsIterator = results.iterator(); + Set names = new HashSet<>(results.size()); + + while (resultsIterator.hasNext()) { + StepExecution curResult = resultsIterator.next(); + + assertEquals(BatchStatus.COMPLETED, curResult.getStatus()); + + assertTrue(!names.contains(curResult.getStepName())); + names.add(curResult.getStepName()); + } + } + + private void validateAppDeploymentRequests(List allRequests, int numberOfPartitions) { + Collections.sort(allRequests, new Comparator() { + @Override + public int compare(AppDeploymentRequest o1, AppDeploymentRequest o2) { + return o1.getDefinition().getName().compareTo(o2.getDefinition().getName()); + } + }); + + for(int i = 4; i < (numberOfPartitions + 4); i++) { + AppDeploymentRequest request = allRequests.get(i - 4); + assertEquals(this.resource, request.getResource()); + assertEquals(0, request.getEnvironmentProperties().size()); + + AppDefinition appDefinition = request.getDefinition(); + assertEquals("partitionedJobTask:partitionedJob:step1:partition" + (i - 3), appDefinition.getName()); + assertEquals("1", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)); + assertEquals(String.valueOf(i), appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)); + assertEquals("step1", appDefinition.getProperties().get(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)); + } + } + + private void validateConstructorValidation(TaskLauncher taskLauncher, JobExplorer jobExplorer, Resource resource, String stepName, String expectedMessage) { + try { + new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, stepName); + } + catch (IllegalArgumentException iae) { + assertEquals(expectedMessage, iae.getMessage()); + } + } +} diff --git a/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerStepExecutionHandlerTests.java b/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerStepExecutionHandlerTests.java new file mode 100644 index 00000000..41fd97a6 --- /dev/null +++ b/spring-cloud-task-batch/src/test/java/org/springframework/cloud/task/batch/partition/DeployerStepExecutionHandlerTests.java @@ -0,0 +1,218 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.task.batch.partition; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInterruptedException; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.NoSuchStepException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.ListableBeanFactory; +import org.springframework.core.env.Environment; +import org.springframework.test.util.ReflectionTestUtils; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +/** + * @author Michael Minella + */ +public class DeployerStepExecutionHandlerTests { + + @Mock + private JobExplorer jobExplorer; + + @Mock + private JobRepository jobRepository; + + @Mock + private Environment environment; + + @Mock + private ListableBeanFactory beanFactory; + + @Mock + private Step step; + + @Captor + private ArgumentCaptor stepExecutionArgumentCaptor; + + private DeployerStepExecutionHandler handler; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + this.handler = new DeployerStepExecutionHandler(this.beanFactory, this.jobExplorer, this.jobRepository); + + ReflectionTestUtils.setField(this.handler, "environment", this.environment); + } + + @Test + public void testConstructorValidation() { + validateConstructorValidation(null, null, null, "A beanFactory is required"); + validateConstructorValidation(this.beanFactory, null, null, "A jobExplorer is required"); + validateConstructorValidation(this.beanFactory, this.jobExplorer, null, "A jobRepository is required"); + + new DeployerStepExecutionHandler(this.beanFactory, this.jobExplorer, this.jobRepository); + } + + @Test + public void testValidationOfRequestValuesExist() throws Exception { + validateEnvironmentConfiguration("A job execution id is required", new String[0]); + validateEnvironmentConfiguration("A step execution id is required", new String[] {DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID}); + validateEnvironmentConfiguration("A step name is required", new String[] {DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID, DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID}); + } + + @Test + public void testValidationOfRequestStepFound() throws Exception { + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn(true); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("foo"); + when(this.beanFactory.getBeanNamesForType(Step.class)).thenReturn(new String[] {"bar", "baz"}); + + try { + this.handler.run(); + } + catch (IllegalArgumentException iae) { + assertEquals("The step requested cannot be found in the provided BeanFactory", iae.getMessage()); + } + } + + @Test + public void testMissingStepExecution() throws Exception { + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn(true); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("foo"); + when(this.beanFactory.getBeanNamesForType(Step.class)).thenReturn(new String[] {"foo", "bar", "baz"}); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn("2"); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn("1"); + + try { + this.handler.run(); + } + catch (NoSuchStepException nsse) { + assertEquals("No StepExecution could be located for step execution id 2 within job execution 1", nsse.getMessage()); + } + } + + @Test + public void testRunSuccessful() throws Exception { + StepExecution workerStep = new StepExecution("workerStep", new JobExecution(1L), 2L); + + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn(true); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); + when(this.beanFactory.getBeanNamesForType(Step.class)).thenReturn(new String[] {"workerStep", "foo", "bar"}); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn("2"); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn("1"); + when(this.jobExplorer.getStepExecution(1L, 2L)).thenReturn(workerStep); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); + when(this.beanFactory.getBean("workerStep", Step.class)).thenReturn(this.step); + + handler.run(); + + verify(this.step).execute(workerStep); + verifyZeroInteractions(this.jobRepository); + } + + @Test + public void testJobInterruptedException() throws Exception { + StepExecution workerStep = new StepExecution("workerStep", new JobExecution(1L), 2L); + + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn(true); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); + when(this.beanFactory.getBeanNamesForType(Step.class)).thenReturn(new String[] {"workerStep", "foo", "bar"}); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn("2"); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn("1"); + when(this.jobExplorer.getStepExecution(1L, 2L)).thenReturn(workerStep); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); + when(this.beanFactory.getBean("workerStep", Step.class)).thenReturn(this.step); + doThrow(new JobInterruptedException("expected")).when(this.step).execute(workerStep); + + handler.run(); + + verify(this.jobRepository).update(this.stepExecutionArgumentCaptor.capture()); + + assertEquals(BatchStatus.STOPPED, this.stepExecutionArgumentCaptor.getValue().getStatus()); + } + + @Test + public void testRuntimeException() throws Exception { + StepExecution workerStep = new StepExecution("workerStep", new JobExecution(1L), 2L); + + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn(true); + when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn(true); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); + when(this.beanFactory.getBeanNamesForType(Step.class)).thenReturn(new String[] {"workerStep", "foo", "bar"}); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn("2"); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn("1"); + when(this.jobExplorer.getStepExecution(1L, 2L)).thenReturn(workerStep); + when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); + when(this.beanFactory.getBean("workerStep", Step.class)).thenReturn(this.step); + doThrow(new RuntimeException("expected")).when(this.step).execute(workerStep); + + handler.run(); + + verify(this.jobRepository).update(this.stepExecutionArgumentCaptor.capture()); + + assertEquals(BatchStatus.FAILED, this.stepExecutionArgumentCaptor.getValue().getStatus()); + } + + private void validateEnvironmentConfiguration(String errorMessage, String[] properties) throws Exception { + + for (String property : properties) { + when(this.environment.containsProperty(property)).thenReturn(true); + } + + try { + this.handler.run(); + } + catch (IllegalArgumentException iae) { + assertEquals(errorMessage, iae.getMessage()); + } + } + + + private void validateConstructorValidation(BeanFactory beanFactory, JobExplorer jobExplorer, JobRepository jobRepository, String message) { + try { + new DeployerStepExecutionHandler(beanFactory, jobExplorer, jobRepository); + } + catch (IllegalArgumentException iae) { + assertEquals(message, iae.getMessage()); + } + } +} diff --git a/spring-cloud-task-docs/src/main/asciidoc/batch.adoc b/spring-cloud-task-docs/src/main/asciidoc/batch.adoc new file mode 100644 index 00000000..728f810f --- /dev/null +++ b/spring-cloud-task-docs/src/main/asciidoc/batch.adoc @@ -0,0 +1,102 @@ + +[[batch]] += Batch + +[[partintro]] +-- +This section goes into more detail about Spring Cloud Task's integrations with Spring +Batch. Tracking the association between a job execution and the task it was executed +within as well as remote partitioning via Spring Cloud Deployer are all covered within +this section. +-- + +[[batch-association]] +== Associating A Job Execution To The Task In Which It Was Executed + +Spring Boot provides facilities for the execution of batch jobs easily within an über-jar. +Spring Boot's support of this functionality allows for a developer to execute multiple +batch jobs within that execution. Spring Cloud Task provides the ability to associate the +execution of a job (a job execution) with a task's execution so that one can be traced +back to the other. + +This functionality is accomplished by using the `TaskBatchExecutionListener`. By default, +this listener is auto configured in any context that has both a Spring Batch Job configured +(via having a bean of type Job defined in the context) and the spring-cloud-task-batch jar +is available within the classpath. The listener will be injected into all jobs. + +[[batch-association-override]] +=== Overriding the TaskBatchExecutionListener + +To prevent the listener from being injected into any batch jobs within the current context, +the autoconfiguration can be disabled via standard Spring Boot mechanisms. + +To only have the listener injected into particular jobs within the context, the +`batchTaskExecutionListenerBeanPostProcessor` may be overridden and a list of job bean ids +can be provided: + +``` +public TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() { + TaskBatchExecutionListenerBeanPostProcessor postProcessor = + new TaskBatchExecutionListenerBeanPostProcessor(); + + postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"})); + + return postProcessor; +} +``` + +[[batch-partitioning]] +== Remote Partitioning + +Spring Cloud Deployer provides facilities for launching Spring Boot based applications on +most cloud infrastructures. The `DeployerPartitionHandler` and +`DeployerStepExecutionHandler` delegate the launching of worker step executions to Spring +Cloud Deployer. + +To configure the `DeployerStepExecutionHandler`, a `Resource` representing the Spring Boot +über-jar to be executed, a `TaskLauncher`, and a `JobExplorer` are all required. You can +configure any environment properties as well as the max number of workers to be executing +at once, the interval to poll for the results (defaults to 10 seconds), and a timeout +(defaults to -1 or no timeout). An example of configuring this `PartitionHandler` would +look like the following: + +``` +@Bean +public PartitionHandler partitionHandler(TaskLauncher taskLauncher, + JobExplorer jobExplorer) throws Exception { + MavenResource resource = + MavenResource.parse(String.format("%s:%s:%s", + "io.spring.cloud", + "partitioned-batch-job", + "1.0.0.BUILD-SNAPSHOT")); + + DeployerPartitionHandler partitionHandler = + new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep"); + + Map environmentProperties = new HashMap<>(); + environmentProperties.put("spring.profiles.active", "worker"); + + partitionHandler.setEnvironmentProperties(environmentProperties); + partitionHandler.setMaxWorkers(2); + + return partitionHandler; +} +``` + +The `Resource` to be executed is expected to be a Spring Boot über-jar with a +`DeployerStepExecutionHandler` configured as a `CommandLineRunner` in the current context. +Both the master and slave are expected to have visibility into the same data store being +used as the job repository and task repository. Once the underlying infrastructure has +bootstrapped the Spring Boot jar and Spring Boot has launched the +`DeployerStepExecutionHandler`, the step handler will execute the Step requested. An +example of configuring the `DefaultStepExecutionHandler`: + +``` +@Bean +public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) { + DeployerStepExecutionHandler handler = + new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository); + + return handler; +} +``` diff --git a/spring-cloud-task-docs/src/main/asciidoc/index.adoc b/spring-cloud-task-docs/src/main/asciidoc/index.adoc index 4bad9751..63bade6d 100644 --- a/spring-cloud-task-docs/src/main/asciidoc/index.adoc +++ b/spring-cloud-task-docs/src/main/asciidoc/index.adoc @@ -42,6 +42,7 @@ Michael Minella, Glenn Renfro include::preface.adoc[] include::getting-started.adoc[] include::features.adoc[] +include::batch.adoc[] include::appendix.adoc[] // ====================================================================================== diff --git a/spring-cloud-task-samples/batch-job/pom.xml b/spring-cloud-task-samples/batch-job/pom.xml index d0a04a30..c639a5eb 100644 --- a/spring-cloud-task-samples/batch-job/pom.xml +++ b/spring-cloud-task-samples/batch-job/pom.xml @@ -19,6 +19,7 @@ UTF-8 + 1.7 diff --git a/spring-cloud-task-samples/batch-job/src/main/java/demo/BatchJobApplication.java b/spring-cloud-task-samples/batch-job/src/main/java/io/spring/BatchJobApplication.java similarity index 96% rename from spring-cloud-task-samples/batch-job/src/main/java/demo/BatchJobApplication.java rename to spring-cloud-task-samples/batch-job/src/main/java/io/spring/BatchJobApplication.java index 670fcef9..b05f5579 100644 --- a/spring-cloud-task-samples/batch-job/src/main/java/demo/BatchJobApplication.java +++ b/spring-cloud-task-samples/batch-job/src/main/java/io/spring/BatchJobApplication.java @@ -1,4 +1,4 @@ -package demo; +package io.spring; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; diff --git a/spring-cloud-task-samples/batch-job/src/main/java/demo/configuration/JobConfiguration.java b/spring-cloud-task-samples/batch-job/src/main/java/io/spring/configuration/JobConfiguration.java similarity index 98% rename from spring-cloud-task-samples/batch-job/src/main/java/demo/configuration/JobConfiguration.java rename to spring-cloud-task-samples/batch-job/src/main/java/io/spring/configuration/JobConfiguration.java index 84052d8f..783e914e 100644 --- a/spring-cloud-task-samples/batch-job/src/main/java/demo/configuration/JobConfiguration.java +++ b/spring-cloud-task-samples/batch-job/src/main/java/io/spring/configuration/JobConfiguration.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package demo.configuration; +package io.spring.configuration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; diff --git a/spring-cloud-task-samples/batch-job/src/test/java/demo/BatchJobApplicationTests.java b/spring-cloud-task-samples/batch-job/src/test/java/io/spring/BatchJobApplicationTests.java similarity index 99% rename from spring-cloud-task-samples/batch-job/src/test/java/demo/BatchJobApplicationTests.java rename to spring-cloud-task-samples/batch-job/src/test/java/io/spring/BatchJobApplicationTests.java index 8dd87e70..f6fec0d9 100644 --- a/spring-cloud-task-samples/batch-job/src/test/java/demo/BatchJobApplicationTests.java +++ b/spring-cloud-task-samples/batch-job/src/test/java/io/spring/BatchJobApplicationTests.java @@ -14,19 +14,20 @@ * limitations under the License. */ -package demo; - -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; +package io.spring; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.junit.Rule; import org.junit.Test; + import org.springframework.boot.SpringApplication; import org.springframework.boot.test.OutputCapture; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + /** * Verifies that the Task Application outputs the correct task log entries. * @@ -69,6 +70,7 @@ public class BatchJobApplicationTests { String taskTitle = "Demo Batch Job Task"; Pattern pattern = Pattern.compile(taskTitle); Matcher matcher = pattern.matcher(output); + int count = 0; while (matcher.find()) { count++; diff --git a/spring-cloud-task-samples/partitioned-batch-job/.mvn/wrapper/maven-wrapper.jar b/spring-cloud-task-samples/partitioned-batch-job/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 00000000..5fd4d502 Binary files /dev/null and b/spring-cloud-task-samples/partitioned-batch-job/.mvn/wrapper/maven-wrapper.jar differ diff --git a/spring-cloud-task-samples/partitioned-batch-job/.mvn/wrapper/maven-wrapper.properties b/spring-cloud-task-samples/partitioned-batch-job/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 00000000..eb919476 --- /dev/null +++ b/spring-cloud-task-samples/partitioned-batch-job/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1 @@ +distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.3.3/apache-maven-3.3.3-bin.zip \ No newline at end of file diff --git a/spring-cloud-task-samples/partitioned-batch-job/README.adoc b/spring-cloud-task-samples/partitioned-batch-job/README.adoc new file mode 100644 index 00000000..5fa52bba --- /dev/null +++ b/spring-cloud-task-samples/partitioned-batch-job/README.adoc @@ -0,0 +1,27 @@ += Partitioned Job + +An example of the usage of the `DeployerPartitionHandler` and +`DeployerStepExecutionHandler` to partition a Spring Batch job. + +== Requirements: + +* Java 7 or Above + +== Build: + +[source,shell,indent=2] +---- +$ ./mvnw clean install +---- + +== Execute: + +[source,shell,indent=2] +---- +$ java -jar -Dspring.profiles.active=master target/partitioned-batch-job-1.0.0.BUILD-SNAPSHOT.jar +---- + +== Dependencies: + +A datasource (not in memory) must be configured based on normal Spring Boot conventions +(application.properties/application.yml/environment variables/etc). \ No newline at end of file diff --git a/spring-cloud-task-samples/partitioned-batch-job/mvnw b/spring-cloud-task-samples/partitioned-batch-job/mvnw new file mode 100755 index 00000000..a1ba1bf5 --- /dev/null +++ b/spring-cloud-task-samples/partitioned-batch-job/mvnw @@ -0,0 +1,233 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven2 Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # + # Look for the Apple JDKs first to preserve the existing behaviour, and then look + # for the new JDKs provided by Oracle. + # + if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then + # + # Apple JDKs + # + export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then + # + # Apple JDKs + # + export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then + # + # Oracle JDKs + # + export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then + # + # Apple JDKs + # + export JAVA_HOME=`/usr/libexec/java_home` + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Migwn, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + # TODO classpath? +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` +fi + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + local basedir=$(pwd) + local wdir=$(pwd) + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + wdir=$(cd "$wdir/.."; pwd) + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)} +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} "$@" diff --git a/spring-cloud-task-samples/partitioned-batch-job/mvnw.cmd b/spring-cloud-task-samples/partitioned-batch-job/mvnw.cmd new file mode 100644 index 00000000..2b934e89 --- /dev/null +++ b/spring-cloud-task-samples/partitioned-batch-job/mvnw.cmd @@ -0,0 +1,145 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven2 Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +set MAVEN_CMD_LINE_ARGS=%* + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" + +set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar"" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS% +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% \ No newline at end of file diff --git a/spring-cloud-task-samples/partitioned-batch-job/pom.xml b/spring-cloud-task-samples/partitioned-batch-job/pom.xml new file mode 100644 index 00000000..57ad8e8f --- /dev/null +++ b/spring-cloud-task-samples/partitioned-batch-job/pom.xml @@ -0,0 +1,74 @@ + + + 4.0.0 + + io.spring.cloud + partitioned-batch-job + jar + Partitioned Batch Job + 1.0.0.BUILD-SNAPSHOT + Sample of using the DeployerPartitionHandler + + + org.springframework.boot + spring-boot-starter-parent + 1.3.3.RELEASE + + + + UTF-8 + 1.7 + + + + + org.springframework.boot + spring-boot-starter-batch + + + + org.springframework.cloud + spring-cloud-task-batch + 1.0.0.BUILD-SNAPSHOT + + + + org.springframework.batch + spring-batch-integration + + + + org.springframework.cloud + spring-cloud-deployer-local + 1.0.0.BUILD-SNAPSHOT + + + + mysql + mysql-connector-java + + + + org.springframework.boot + spring-boot-starter-jdbc + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + diff --git a/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/JobConfiguration.java b/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/JobConfiguration.java new file mode 100644 index 00000000..195bff4f --- /dev/null +++ b/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/JobConfiguration.java @@ -0,0 +1,173 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.spring; + +import java.util.HashMap; +import java.util.Map; +import javax.sql.DataSource; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; +import org.springframework.batch.core.explore.support.SimpleJobExplorer; +import org.springframework.batch.core.partition.PartitionHandler; +import org.springframework.batch.core.partition.support.Partitioner; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.deployer.resource.maven.MavenResource; +import org.springframework.cloud.deployer.spi.local.LocalDeployerProperties; +import org.springframework.cloud.deployer.spi.local.LocalTaskLauncher; +import org.springframework.cloud.deployer.spi.task.TaskLauncher; +import org.springframework.cloud.task.batch.partition.DeployerPartitionHandler; +import org.springframework.cloud.task.batch.partition.DeployerStepExecutionHandler; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; + +/** + * @author Michael Minella + */ +@Configuration +public class JobConfiguration { + + @Autowired + public JobBuilderFactory jobBuilderFactory; + + @Autowired + public StepBuilderFactory stepBuilderFactory; + + @Autowired + public DataSource dataSource; + + @Autowired + public JobRepository jobRepository; + + @Autowired + private ConfigurableApplicationContext context; + + private static final int GRID_SIZE = 4; + + @Bean + public JobExplorerFactoryBean jobExplorer() { + JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean(); + + jobExplorerFactoryBean.setDataSource(this.dataSource); + + return jobExplorerFactoryBean; + } + + @Bean + public TaskLauncher taskLauncher() { + LocalDeployerProperties localDeployerProperties = new LocalDeployerProperties(); + + localDeployerProperties.setDeleteFilesOnExit(false); + + return new LocalTaskLauncher(localDeployerProperties); + } + + @Bean + public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer) throws Exception { + MavenResource resource = MavenResource.parse("io.spring.cloud:partitioned-batch-job:1.0.0.BUILD-SNAPSHOT"); + + DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep"); + + Map environmentProperties = new HashMap<>(); + environmentProperties.put("spring.profiles.active", "worker"); + + partitionHandler.setEnvironmentProperties(environmentProperties); + partitionHandler.setMaxWorkers(2); + + return partitionHandler; + } + + @Bean + public Partitioner partitioner() { + return new Partitioner() { + @Override + public Map partition(int gridSize) { + + Map partitions = new HashMap<>(gridSize); + + for(int i = 0; i < GRID_SIZE; i++) { + ExecutionContext context1 = new ExecutionContext(); + context1.put("partitionNumber", i); + + partitions.put("partition" + i, context1); + } + + return partitions; + } + }; + } + + @Bean + @Profile("worker") + public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) { + DeployerStepExecutionHandler handler = new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository); + + return handler; + } + + @Bean + @StepScope + public Tasklet workerTasklet( + final @Value("#{stepExecutionContext['partitionNumber']}")Integer partitionNumber) { + + return new Tasklet() { + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + System.out.println("This tasklet ran partition: " + partitionNumber); + + return RepeatStatus.FINISHED; + } + }; + } + + @Bean + public Step step1(PartitionHandler partitionHandler) throws Exception { + return stepBuilderFactory.get("step1") + .partitioner(workerStep().getName(), partitioner()) + .step(workerStep()) + .partitionHandler(partitionHandler) + .build(); + } + + @Bean + public Step workerStep() { + return stepBuilderFactory.get("workerStep") + .tasklet(workerTasklet(null)) + .build(); + } + + @Bean + @Profile("master") + public Job job(PartitionHandler partitionHandler) throws Exception { + return jobBuilderFactory.get("job") + .start(step1(partitionHandler)) + .build(); + } +} diff --git a/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/PartitionedBatchJobApplication.java b/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/PartitionedBatchJobApplication.java new file mode 100644 index 00000000..7a042cf1 --- /dev/null +++ b/spring-cloud-task-samples/partitioned-batch-job/src/main/java/io/spring/PartitionedBatchJobApplication.java @@ -0,0 +1,16 @@ +package io.spring; + +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.task.configuration.EnableTask; + +@SpringBootApplication +@EnableBatchProcessing +@EnableTask +public class PartitionedBatchJobApplication { + + public static void main(String[] args) { + SpringApplication.run(PartitionedBatchJobApplication.class, args); + } +} diff --git a/spring-cloud-task-samples/partitioned-batch-job/src/main/resources/application.properties b/spring-cloud-task-samples/partitioned-batch-job/src/main/resources/application.properties new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-task-samples/pom.xml b/spring-cloud-task-samples/pom.xml index 3344a436..f808699b 100644 --- a/spring-cloud-task-samples/pom.xml +++ b/spring-cloud-task-samples/pom.xml @@ -26,6 +26,7 @@ batch-job tasksink taskprocessor + partitioned-batch-job