Add Job Event Messaging to tasks.

* Added test for sample and cleanup

* Added skipEventsListener

* Added new message format.

* Added integration tests.

* Updated application.properties to add json content type

resolves spring-cloud/spring-cloud-task#119
This commit is contained in:
Michael Minella
2016-04-12 18:05:16 -05:00
parent ced5a3aa87
commit 43d869f726
51 changed files with 3620 additions and 11 deletions

View File

@@ -78,7 +78,7 @@ public class TaskExecution {
this.exitCode = exitCode;
this.taskName = taskName;
this.exitMessage = exitMessage;
this.parameters = parameters;
this.parameters = new ArrayList<>(parameters);
this.startTime = (Date)startTime.clone();
this.endTime = (endTime != null) ? (Date)endTime.clone() : null;
}
@@ -132,7 +132,7 @@ public class TaskExecution {
}
public void setParameters(List<String> parameters) {
this.parameters = parameters;
this.parameters = new ArrayList<> (parameters);
}
@Override

View File

@@ -36,5 +36,10 @@
<version>1.0.0.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-task-batch</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,89 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package configuration;
import java.util.*;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Glenn Renfro
*/
@Configuration
@EnableBatchProcessing
@EnableTask
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1()).next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Executed");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").chunk(3)
.reader(new ListItemReader<>(Arrays.asList("1", "2", "3", "4", "5", "6")))
.processor(new ItemProcessor<Object, Object>() {
@Override
public String process(Object item) throws Exception {
return String.valueOf(Integer.parseInt((String) item) * -1);
}
})
.writer(new ItemWriter<Object>() {
@Override
public void write(List<? extends Object> items) throws Exception {
for (Object item : items) {
System.out.println(">> " + item);
}
}
}).build();
}
}

View File

@@ -0,0 +1,81 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package configuration;
import java.util.*;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Glenn Renfro
*/
@Configuration
@EnableBatchProcessing
@EnableTask
public class JobSkipConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1()).next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Executed");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").chunk(3).faultTolerant().skip(IllegalStateException.class).skipLimit(100)
.reader(new SkipItemReader())
.processor(new ItemProcessor<Object, Object>() {
@Override
public String process(Object item) throws Exception {
return String.valueOf(Integer.parseInt((String) item) * -1);
}
})
.writer( new SkipItemWriter() ).build();
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package configuration;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
/**
* @author Glenn Renfro
*/
public class SkipItemReader implements ItemReader{
int failCount = 0;
boolean finished = false;
@Override
public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
String result = "1";
if(failCount < 2) {
failCount++;
throw new IllegalStateException("Reader FOOBAR");
}
if (finished){
result = null;
}
finished = true;
return result;
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package configuration;
import java.util.*;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
/**
* @author Glenn Renfro
*/
public class SkipItemWriter implements ItemWriter {
int failCount = 0;
boolean finished = false;
@Override
public void write(List items) throws Exception {
String result = "1";
if(failCount < 2) {
failCount++;
throw new IllegalStateException("Writer FOOBAR");
}
for (Object item : items) {
System.out.println(">> " + item);
}
}
}

View File

@@ -0,0 +1,288 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.listener;
import java.util.concurrent.*;
import configuration.JobConfiguration;
import configuration.JobSkipConfiguration;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.redis.config.RedisServiceAutoConfiguration;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration;
import org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration;
import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent;
import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.PropertySource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* @author Glenn Renfro
*/
public class BatchExecutionEventTests {
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
// Count for two job execution events per job
static CountDownLatch jobExecutionLatch = new CountDownLatch(2);
// Count for four step execution events per job
static CountDownLatch stepExecutionLatch = new CountDownLatch(4);
static int stepOneCount = 0;
static int stepTwoCount = 0;
// Count for twelve item process events per job
static CountDownLatch itemProcessLatch = new CountDownLatch(6);
// Count for eight chunk events per job
static CountDownLatch chunkEventsLatch = new CountDownLatch(8);
// Count for zero read events per job
static CountDownLatch itemReadEventsLatch = new CountDownLatch(0);
// Count for six write events per job
static CountDownLatch itemWriteEventsLatch = new CountDownLatch(2);
// Count for 3 skip events per job
static CountDownLatch skipEventsLatch = new CountDownLatch(3);
static int readSkipCount = 0;
static int writeSkipCount = 0;
private static final String TASK_NAME = "jobEventTest";
private ConfigurableApplicationContext applicationContext;
@After
public void tearDown() {
if (applicationContext != null && applicationContext.isActive() ) {
applicationContext.close();
}
}
@Test
public void testContext() {
applicationContext = new SpringApplicationBuilder()
.sources(this.getConfigurations(BatchExecutionEventTests.ListenerBinding.class, JobConfiguration.class))
.build().run(new String[]{ "--spring.cloud.task.closecontext.enable=false" });
assertNotNull(applicationContext.getBean("jobExecutionEventsListener"));
assertNotNull(applicationContext.getBean("stepExecutionEventsListener"));
assertNotNull(applicationContext.getBean("chunkEventsListener"));
assertNotNull(applicationContext.getBean("itemReadEventsListener"));
assertNotNull(applicationContext.getBean("itemWriteEventsListener"));
assertNotNull(applicationContext.getBean("itemProcessEventsListener"));
assertNotNull(applicationContext.getBean("skipEventsListener"));
assertNotNull(applicationContext.getBean(BatchEventAutoConfiguration.BatchEventsChannels.class));
}
@Test
public void testJobEventListener() throws Exception {
testListener("--spring.cloud.stream.bindings.job-execution-events.destination=foobar",
jobExecutionLatch, BatchExecutionEventTests.ListenerBinding.class);
}
@Test
public void testStepEventListener() throws Exception {
testListener("--spring.cloud.stream.bindings.step-execution-events.destination=step-execution-foobar",
stepExecutionLatch, BatchExecutionEventTests.StepListenerBinding.class);
assertEquals("the number of step1 events did not match", 2, stepOneCount);
assertEquals("the number of step2 events did not match", 2, stepTwoCount);
}
@Test
public void testItemProcessEventListener() throws Exception {
testListener("--spring.cloud.stream.bindings.item-process-events.destination=item-process-foobar",
itemProcessLatch, BatchExecutionEventTests.ItemProcessListenerBinding.class);
}
@Test
public void testChunkListener() throws Exception {
testListener("--spring.cloud.stream.bindings.chunk-events.destination=chunk-events-foobar",
chunkEventsLatch, BatchExecutionEventTests.ChunkEventsListenerBinding.class);
}
@Test
public void testItemReadListener() throws Exception {
testListener("--spring.cloud.stream.bindings.item-read-events.destination=item-read-events-foobar",
itemReadEventsLatch, BatchExecutionEventTests.ItemReadEventsListenerBinding.class);
}
@Test
public void testWriteListener() throws Exception {
testListener("--spring.cloud.stream.bindings.item-write-events.destination=item-write-events-foobar",
itemWriteEventsLatch, BatchExecutionEventTests.ItemWriteEventsListenerBinding.class);
}
@Test
public void testSkipEventListener() throws Exception {
testListenerSkip("--spring.cloud.stream.bindings.skip-events.destination=skip-event-foobar",
skipEventsLatch, BatchExecutionEventTests.SkipEventsListenerBinding.class);
assertEquals("read skip count did not match expected result", 2, readSkipCount);
assertEquals("write skip count did not match expected result", 1, writeSkipCount);
}
@EnableBinding(Sink.class)
@PropertySource("classpath:/org/springframework/cloud/task/listener/job-execution-sink-channel.properties")
@EnableAutoConfiguration
public static class ListenerBinding {
@StreamListener(Sink.INPUT)
public void receive(JobExecutionEvent execution) {
assertEquals(String.format("Job name should be job"), "job", execution.getJobInstance().getJobName());
jobExecutionLatch.countDown();
}
}
@EnableBinding(Sink.class)
@PropertySource("classpath:/org/springframework/cloud/task/listener/step-execution-sink-channel.properties")
@EnableAutoConfiguration
public static class StepListenerBinding {
@StreamListener(Sink.INPUT)
public void receive(StepExecutionEvent execution) {
if(execution.getStepName().equals("step1")) {
stepOneCount++;
}
if(execution.getStepName().equals("step2")) {
stepTwoCount++;
}
stepExecutionLatch.countDown();
}
}
@EnableBinding(Sink.class)
@PropertySource("classpath:/org/springframework/cloud/task/listener/item-process-sink-channel.properties")
@EnableAutoConfiguration
public static class ItemProcessListenerBinding {
@StreamListener(Sink.INPUT)
public void receive(Object object) {
itemProcessLatch.countDown();
}
}
@EnableBinding(Sink.class)
@PropertySource("classpath:/org/springframework/cloud/task/listener/chunk-events-sink-channel.properties")
@EnableAutoConfiguration
public static class ChunkEventsListenerBinding {
@StreamListener(Sink.INPUT)
public void receive(Object chunkContext) {
chunkEventsLatch.countDown();
}
}
@EnableBinding(Sink.class)
@PropertySource("classpath:/org/springframework/cloud/task/listener/item-read-events-sink-channel.properties")
@EnableAutoConfiguration
public static class ItemReadEventsListenerBinding {
@StreamListener(Sink.INPUT)
public void receive(Object itemRead) {
itemReadEventsLatch.countDown();
}
}
@EnableBinding(Sink.class)
@PropertySource("classpath:/org/springframework/cloud/task/listener/skip-events-sink-channel.properties")
@EnableAutoConfiguration
public static class SkipEventsListenerBinding {
private static final String SKIPPING_READ_MESSAGE = "Skipped when reading.";
private static final String SKIPPING_WRITE_CONTENT = "-1";
@StreamListener(Sink.INPUT)
public void receive(Object exceptionMessage) {
if(exceptionMessage.toString().equals(SKIPPING_READ_MESSAGE)){
readSkipCount++;
}
if(exceptionMessage.toString().equals(SKIPPING_WRITE_CONTENT)){
writeSkipCount++;
}
skipEventsLatch.countDown();
}
}
@EnableBinding(Sink.class)
@PropertySource("classpath:/org/springframework/cloud/task/listener/item-write-events-sink-channel.properties")
@EnableAutoConfiguration
public static class ItemWriteEventsListenerBinding {
@StreamListener(Sink.INPUT)
public void receive(Object itemWrite) {
assertTrue("Message should start with '3 items'", itemWrite.toString().startsWith("3 items "));
assertTrue("Message should end with ' written.'", itemWrite.toString().endsWith(" written."));
itemWriteEventsLatch.countDown();
}
}
private Object[] getConfigurations(Class sinkClazz, Class jobConfigurationClazz) {
return new Object[]{
jobConfigurationClazz,
PropertyPlaceholderAutoConfiguration.class,
BatchAutoConfiguration.class,
TaskBatchAutoConfiguration.class,
TaskEventAutoConfiguration.class,
BatchEventAutoConfiguration.class,
RedisServiceAutoConfiguration.class,
sinkClazz };
}
private String[] getCommandLineParams(String sinkChannelParam) {
return new String[]{ "--spring.cloud.task.closecontext.enable=false",
"--spring.cloud.task.name=" + TASK_NAME,
"--spring.main.web-environment=false",
"--spring.cloud.stream.defaultBinder=redis",
"--spring.cloud.stream.bindings.task-events.destination=test",
sinkChannelParam };
}
private void testListener(String channelBinding, CountDownLatch latch, Class<?> clazz) throws Exception{
applicationContext = new SpringApplicationBuilder()
.sources(this.getConfigurations(clazz, JobConfiguration.class))
.build().run(getCommandLineParams(channelBinding));
assertTrue(latch.await(1, TimeUnit.SECONDS));
}
private void testListenerSkip(String channelBinding, CountDownLatch latch, Class<?> clazz) throws Exception{
applicationContext = new SpringApplicationBuilder()
.sources(this.getConfigurations(clazz, JobSkipConfiguration.class))
.build().run(getCommandLineParams(channelBinding));
assertTrue(latch.await(1, TimeUnit.SECONDS));
}
}

View File

@@ -0,0 +1,3 @@
logging.level.org.springframework.cloud.task=DEBUG
logging.level.org.springframework.cloud.stream=DEBUG
spring.application.name=batchEvents

View File

@@ -0,0 +1,17 @@
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.cloud.stream.bindings.input.destination=chunk-events-foobar

View File

@@ -0,0 +1,17 @@
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.cloud.stream.bindings.input.destination=item-process-foobar

View File

@@ -0,0 +1,17 @@
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.cloud.stream.bindings.input.destination=item-read-events-foobar

View File

@@ -0,0 +1,17 @@
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.cloud.stream.bindings.input.destination=item-write-events-foobar

View File

@@ -0,0 +1,17 @@
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.cloud.stream.bindings.input.destination=foobar

View File

@@ -1 +1,17 @@
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.cloud.stream.bindings.input.destination=test

View File

@@ -0,0 +1,17 @@
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.cloud.stream.bindings.input.destination=skip-event-foobar

View File

@@ -0,0 +1,17 @@
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.cloud.stream.bindings.input.destination=step-execution-foobar

View File

@@ -0,0 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.3.3/apache-maven-3.3.3-bin.zip

233
spring-cloud-task-samples/batch-events/mvnw vendored Executable file
View File

@@ -0,0 +1,233 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
#
# Look for the Apple JDKs first to preserve the existing behaviour, and then look
# for the new JDKs provided by Oracle.
#
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then
#
# Apple JDKs
#
export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home
fi
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then
#
# Apple JDKs
#
export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
fi
if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then
#
# Oracle JDKs
#
export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
fi
if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then
#
# Apple JDKs
#
export JAVA_HOME=`/usr/libexec/java_home`
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
fi
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
local basedir=$(pwd)
local wdir=$(pwd)
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
wdir=$(cd "$wdir/.."; pwd)
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)}
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# Provide a "standardized" way to retrieve the CLI args that will
# work with both Windows and non-Windows executions.
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
export MAVEN_CMD_LINE_ARGS
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} "$@"

View File

@@ -0,0 +1,145 @@
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven2 Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
set MAVEN_CMD_LINE_ARGS=%*
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar""
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS%
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%

View File

@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.spring.cloud</groupId>
<artifactId>batch-events</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Batch Events Sample Application</name>
<description>Sample of sending batch events via Spring Cloud Streams</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.7</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-task-core</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-task-stream</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-redis</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,100 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.spring.cloud;
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@EnableTask
@EnableBatchProcessing
public class BatchEventsApplication {
public static void main(String[] args) {
SpringApplication.run(BatchEventsApplication.class, args);
}
@Configuration
public static class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Tasklet has run");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step2() {
return this.stepBuilderFactory.get("step2")
.<String, String>chunk(3)
.reader(new ListItemReader<>(Arrays.asList("1", "2", "3", "4", "5", "6")))
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
return String.valueOf(Integer.parseInt(item) * -1);
}
})
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
for (String item : items) {
System.out.println(">> " + item);
}
}
}).build();
}
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
}
}

View File

@@ -0,0 +1,3 @@
logging.level.org.springframework.cloud.task=DEBUG
logging.level.org.springframework.cloud.stream=DEBUG
spring.application.name=batchEvents

View File

@@ -0,0 +1,71 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.spring.cloud;
import java.util.concurrent.*;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.OutputCapture;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent;
import org.springframework.context.annotation.PropertySource;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
public class BatchEventsApplicationTests {
private static final String ITEM_INDICATOR = ">> -";
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
@Rule
public OutputCapture outputCapture = new OutputCapture();
// Count for two job execution events per task
static CountDownLatch jobExecutionLatch = new CountDownLatch(2);
@Test
public void testExecution() throws Exception {
SpringApplication.run(BatchEventsApplication.class);
Assert.assertTrue(jobExecutionLatch.await(1, TimeUnit.SECONDS));
}
@EnableBinding(Sink.class)
@PropertySource("classpath:io/spring/task/listener/job-listener-sink-channel.properties")
@EnableAutoConfiguration
public static class JobExecutionListenerBinding {
@StreamListener(Sink.INPUT)
public void receive(JobExecutionEvent execution) {
Assert.assertEquals(String.format("Job name should be job"), "job", execution.getJobInstance().getJobName());
jobExecutionLatch.countDown();
}
}
}

View File

@@ -0,0 +1,17 @@
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring.cloud.stream.bindings.input.destination=job-execution-events

View File

@@ -9,7 +9,7 @@
<version>1.0.0.BUILD-SNAPSHOT</version>
<description>Spring Cloud Task Batch Example</description>
<name>batch-job</name>
<name>Batch Job Sample Application</name>
<parent>
<groupId>org.springframework.boot</groupId>

View File

@@ -39,7 +39,7 @@ public class BatchJobApplicationTests {
public OutputCapture outputCapture = new OutputCapture();
@Test
public void testTimeStampApp() throws Exception {
public void testBatchJobApp() throws Exception {
final String JOB_RUN_MESSAGE = " was run";
final String CREATE_TASK_MESSAGE = "Creating: TaskExecution{executionId=";
final String UPDATE_TASK_MESSAGE = "Updating: TaskExecution with executionId=";

View File

@@ -28,6 +28,7 @@
<module>taskprocessor</module>
<module>partitioned-batch-job</module>
<module>task-events</module>
<module>batch-events</module>
</modules>
<build>

View File

@@ -15,6 +15,10 @@
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
@@ -42,9 +46,25 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-redis</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-task-batch</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,147 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.task.batch.listener.support.TaskBatchEventListenerBeanPostProcessor;
import org.springframework.cloud.task.listener.TaskLifecycleListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.messaging.MessageChannel;
/**
* Configures the listeners and channels that are required to emit job messages.
* @author Michael Minella
* @author Glenn Renfro
*/
@Configuration
@ConditionalOnBean(value = { Job.class, TaskLifecycleListener.class })
@ConditionalOnProperty(prefix = "spring.cloud.task.batch.events", name = "enabled", havingValue = "true", matchIfMissing = true)
public class BatchEventAutoConfiguration {
public final static String JOB_EXECUTION_EVENTS_LISTENER = "jobExecutionEventsListener";
public final static String CHUNK_EVENTS_LISTENER = "chunkEventsListener";
public final static String STEP_EXECUTION_EVENTS_LISTENER = "stepExecutionEventsListener";
public final static String ITEM_READ_EVENTS_LISTENER = "itemReadEventsListener";
public final static String ITEM_WRITE_EVENTS_LISTENER = "itemWriteEventsListener";
public final static String ITEM_PROCESS_EVENTS_LISTENER = "itemProcessEventsListener";
public final static String SKIP_EVENTS_LISTENER = "skipEventsListener";
@Bean
@ConditionalOnMissingBean
public TaskBatchEventListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
return new TaskBatchEventListenerBeanPostProcessor();
}
@Configuration
@EnableBinding(BatchEventsChannels.class)
@ConditionalOnMissingBean(name = JOB_EXECUTION_EVENTS_LISTENER)
public static class JobExecutionListenerConfiguration {
@Autowired
private BatchEventsChannels listenerChannels;
@Bean
@Lazy
public JobExecutionListener jobExecutionEventsListener() {
return new EventEmittingJobExecutionListener(listenerChannels.jobExecutionEvents());
}
@Bean
public StepExecutionListener stepExecutionEventsListener() {
return new EventEmittingStepExecutionListener(listenerChannels.stepExecutionEvents());
}
@Bean
@Lazy
public GatewayProxyFactoryBean chunkEventsListener() {
GatewayProxyFactoryBean factoryBean =
new GatewayProxyFactoryBean(ChunkListener.class);
factoryBean.setDefaultRequestChannel(listenerChannels.chunkEvents());
return factoryBean;
}
@Bean
public ItemReadListener itemReadEventsListener() {
return new EventEmittingItemReadEventsListener(listenerChannels.itemReadEvents());
}
@Bean
public ItemWriteListener itemWriteEventsListener() {
return new EventEmittingItemWriteEventsListener(listenerChannels.itemWriteEvents());
}
@Bean
public ItemProcessListener itemProcessEventsListener() {
return new EventEmittingItemProcessListener(listenerChannels.itemProcessEvents());
}
@Bean
public SkipListener skipEventsListener() {
return new EventEmittingSkipListener(listenerChannels.skipEvents());
}
}
public interface BatchEventsChannels {
String JOB_EXECUTION_EVENTS = "job-execution-events";
String STEP_EXECUTION_EVENTS = "step-execution-events";
String CHUNK_EXECUTION_EVENTS = "chunk-events";
String ITEM_READ_EVENTS = "item-read-events";
String ITEM_PROCESS_EVENTS = "item-process-events";
String ITEM_WRITE_EVENTS = "item-write-events";
String SKIP_EVENTS = "skip-events";
@Output(JOB_EXECUTION_EVENTS)
MessageChannel jobExecutionEvents();
@Output(STEP_EXECUTION_EVENTS)
MessageChannel stepExecutionEvents();
@Output(CHUNK_EXECUTION_EVENTS)
MessageChannel chunkEvents();
@Output(ITEM_READ_EVENTS)
MessageChannel itemReadEvents();
@Output(ITEM_WRITE_EVENTS)
MessageChannel itemWriteEvents();
@Output(ITEM_PROCESS_EVENTS)
MessageChannel itemProcessEvents();
@Output(SKIP_EVENTS)
MessageChannel skipEvents();
}
}

View File

@@ -0,0 +1,61 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.cloud.task.batch.listener.support.MessagePublisher;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
/**
* Setups up the ItemProcessListener to emit events to the spring cloud stream output channel.
*
* @author Michael Minella
* @author Glenn Renfro
*/
public class EventEmittingItemProcessListener implements ItemProcessListener {
private MessageChannel output;
private MessagePublisher<Object> messagePublisher;
public EventEmittingItemProcessListener(MessageChannel output) {
Assert.notNull(output, "An output channel is required");
this.output = output;
this.messagePublisher = new MessagePublisher(output);
}
@Override
public void beforeProcess(Object item) {
}
@Override
public void afterProcess(Object item, Object result) {
if (result == null) {
messagePublisher.publish("1 item was filtered");
}
else if (item.equals(result)) {
messagePublisher.publish("item equaled result after processing");
}
else {
messagePublisher.publish("item did not equal result after processing");
}
}
@Override
public void onProcessError(Object item, Exception e) {
messagePublisher.publishWithThrowableHeader("Exception while item was being processed", e.getMessage());
}
}

View File

@@ -0,0 +1,61 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.cloud.task.batch.listener.support.MessagePublisher;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
/**
* Setups up the ItemReadEventsListener to emit events to the spring cloud stream output channel.
*
* @author Glenn Renfro
*/
public class EventEmittingItemReadEventsListener implements ItemReadListener {
private static final Logger logger = LoggerFactory.getLogger(EventEmittingItemReadEventsListener.class);
private MessageChannel output;
private MessagePublisher<Object> messagePublisher;
public EventEmittingItemReadEventsListener(MessageChannel output) {
Assert.notNull(output, "An output channel is required");
this.output = output;
this.messagePublisher = new MessagePublisher(output);
}
@Override
public void beforeRead() {
}
@Override
public void afterRead(Object item) {
}
@Override
public void onReadError(Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug("Executing onReadError: " + ex.getMessage(), ex);
}
this.messagePublisher.publish(ex.getMessage());
}
}

View File

@@ -0,0 +1,67 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.cloud.task.batch.listener.support.MessagePublisher;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
/**
* Setups up the ItemWriteEventsListener to emit events to the spring cloud stream output channel.
*
* @author Glenn Renfro
*/
public class EventEmittingItemWriteEventsListener implements ItemWriteListener{
private static final Logger logger = LoggerFactory.getLogger(EventEmittingItemWriteEventsListener.class);
private MessageChannel output;
private MessagePublisher<Object> messagePublisher;
public EventEmittingItemWriteEventsListener(MessageChannel output) {
Assert.notNull(output, "An output channel is required");
this.output = output;
this.messagePublisher = new MessagePublisher(output);
}
@Override
public void beforeWrite(List items) {
messagePublisher.publish(items.size() + " items to be written.");
}
@Override
public void afterWrite(List items) {
if (logger.isDebugEnabled()) {
logger.debug("Executing afterWrite: " + items);
}
this.messagePublisher.publish(items.size() + " items have been written.");
}
@Override
public void onWriteError(Exception exception, List items) {
if (logger.isDebugEnabled()) {
logger.debug("Executing onWriteError: " + exception.getMessage(), exception);
}
String payload = "Exception while " + items.size() + " items are attempted to be written.";
this.messagePublisher.publishWithThrowableHeader(payload, exception.getMessage());
}
}

View File

@@ -0,0 +1,52 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent;
import org.springframework.cloud.task.batch.listener.support.MessagePublisher;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
/**
* Setups up the StepExecutionListener to emit events to the spring cloud stream output channel.
*
* @author Michael Minella
* @author Glenn Renfro
*/
public class EventEmittingJobExecutionListener implements JobExecutionListener {
private MessageChannel output;
private MessagePublisher<JobExecutionEvent> messagePublisher;
public EventEmittingJobExecutionListener(MessageChannel output) {
Assert.notNull(output, "An output channel is required");
this.output = output;
this.messagePublisher = new MessagePublisher<>(output);
}
@Override
public void beforeJob(JobExecution jobExecution) {
this.messagePublisher.publish(new JobExecutionEvent(jobExecution));
}
@Override
public void afterJob(JobExecution jobExecution) {
this.messagePublisher.publish(new JobExecutionEvent(jobExecution));
}
}

View File

@@ -0,0 +1,72 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.SkipListener;
import org.springframework.cloud.task.batch.listener.support.BatchJobHeaders;
import org.springframework.cloud.task.batch.listener.support.MessagePublisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
/**
* Setups up the SkipProcessListener to emit events to the spring cloud stream output channel.
*
* @author Glenn Renfro
*/
public class EventEmittingSkipListener implements SkipListener {
private static final Logger logger = LoggerFactory.getLogger(EventEmittingSkipListener.class);
private MessageChannel output;
private MessagePublisher<Object> messagePublisher;
public EventEmittingSkipListener(MessageChannel output) {
Assert.notNull(output, "An output channel is required");
this.output = output;
this.messagePublisher = new MessagePublisher(output);
}
@Override
public void onSkipInRead(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Executing onSkipInRead: " + t.getMessage(), t);
}
messagePublisher.publishWithThrowableHeader("Skipped when reading.", t.getMessage());
}
@Override
public void onSkipInWrite(Object item, Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Executing onSkipInWrite: " + t.getMessage(), t);
}
messagePublisher.publishWithThrowableHeader(item, t.getMessage());
}
@Override
public void onSkipInProcess(Object item, Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Executing onSkipInProcess: " + t.getMessage(), t);
}
messagePublisher.publishWithThrowableHeader(item, t.getMessage());
}
}

View File

@@ -0,0 +1,55 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent;
import org.springframework.cloud.task.batch.listener.support.MessagePublisher;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
/**
* Setups up the StepExecutionListener to emit events to the spring cloud stream output channel.
*
* @author Michael Minella
* @author Glenn Renfro
*/
public class EventEmittingStepExecutionListener implements StepExecutionListener {
private MessageChannel output;
private MessagePublisher<StepExecutionEvent> messagePublisher;
public EventEmittingStepExecutionListener(MessageChannel output) {
Assert.notNull(output, "An output channel is required");
this.output = output;
this.messagePublisher = new MessagePublisher<>(output);
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.messagePublisher.publish(new StepExecutionEvent(stepExecution));
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
this.messagePublisher.publish(new StepExecutionEvent(stepExecution));
return stepExecution.getExitStatus();
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener.support;
/**
* Headers definitions used by the batch job plugin.
*
* @author Gunnar Hillert
* @since 1.0
*/
public final class BatchJobHeaders {
public static final String BATCH_LISTENER_EVENT_TYPE = "batch_listener_event_type";
public static final String BATCH_EXCEPTION = "batch_exception";
private BatchJobHeaders() {
}
}

View File

@@ -0,0 +1,54 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener.support;
/**
* ExitStatus DTO created so that {@link org.springframework.batch.core.ExitStatus} can be serialized into Json without
* having to add mixins to an ObjectMapper
* @author Glenn Renfro
*/
public class ExitStatus {
private String exitCode;
private String exitDescription;
public ExitStatus(){
}
public ExitStatus(String exitCode, String exitDescription) {
this.exitCode = exitCode;
this.exitDescription = exitDescription;
}
public String getExitCode() {
return exitCode;
}
public void setExitCode(String exitCode) {
this.exitCode = exitCode;
}
public String getExitDescription() {
return exitDescription;
}
public void setExitDescription(String exitDescription) {
this.exitDescription = exitDescription;
}
}

View File

@@ -0,0 +1,336 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener.support;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import org.springframework.batch.core.*;
import org.springframework.batch.item.ExecutionContext;
/**
* This is a JobEvent DTO created so that a {@link org.springframework.batch.core.JobExecution} can be serialized into
* Json without having to add mixins to an ObjectMapper.
* @author Glenn Renfro
*/
public class JobExecutionEvent extends Entity {
private JobParametersEvent jobParameters;
private JobInstanceEvent jobInstance;
private volatile Collection<StepExecutionEvent> stepExecutions = new CopyOnWriteArraySet<StepExecutionEvent>();
private volatile BatchStatus status = BatchStatus.STARTING;
private volatile Date startTime = null;
private volatile Date createTime = new Date(System.currentTimeMillis());
private volatile Date endTime = null;
private volatile Date lastUpdated = null;
private volatile ExitStatus exitStatus = new ExitStatus("UNKNOWN", null);
private volatile ExecutionContext executionContext = new ExecutionContext();
private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<Throwable>();
private String jobConfigurationName;
public JobExecutionEvent() {
super();
}
/**
* Constructor for the StepExecution to initialize the DTO.
*
* @param original the StepExecution to build this DTO around.
*/
public JobExecutionEvent(JobExecution original) {
this.jobParameters = new JobParametersEvent(original.getJobParameters().getParameters());
this.jobInstance = new JobInstanceEvent(original.getJobInstance().getId(), original.getJobInstance().getJobName());
for(StepExecution stepExecution : original.getStepExecutions()){
stepExecutions.add(new StepExecutionEvent(stepExecution));
}
this.status = original.getStatus();
this.startTime = original.getStartTime();
this.createTime = original.getCreateTime();
this.endTime = original.getEndTime();
this.lastUpdated = original.getLastUpdated();
this.exitStatus = new ExitStatus(original.getExitStatus().getExitCode(),
original.getExitStatus().getExitDescription());
this.executionContext = original.getExecutionContext();
this.failureExceptions = original.getFailureExceptions();
this.jobConfigurationName = original.getJobConfigurationName();
this.setId(original.getId());
this.setVersion(original.getVersion());
}
public JobParametersEvent getJobParameters() {
return this.jobParameters;
}
public Date getEndTime() {
return endTime;
}
public void setJobInstance(JobInstanceEvent jobInstance) {
this.jobInstance = jobInstance;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public BatchStatus getStatus() {
return status;
}
/**
* Set the value of the status field.
*
* @param status the status to set
*/
public void setStatus(BatchStatus status) {
this.status = status;
}
/**
* Upgrade the status field if the provided value is greater than the
* existing one. Clients using this method to set the status can be sure
* that they don't overwrite a failed status with an successful one.
*
* @param status the new status value
*/
public void upgradeStatus(BatchStatus status) {
this.status = this.status.upgradeTo(status);
}
/**
* Convenience getter for for the id of the enclosing job. Useful for DAO
* implementations.
*
* @return the id of the enclosing job
*/
public Long getJobId() {
if (jobInstance != null) {
return jobInstance.getId();
}
return null;
}
/**
* @param exitStatus
*/
public void setExitStatus(ExitStatus exitStatus) {
this.exitStatus = exitStatus;
}
/**
* @return the exitCode
*/
public ExitStatus getExitStatus() {
return exitStatus;
}
/**
* @return the Job that is executing.
*/
public JobInstanceEvent getJobInstance() {
return jobInstance;
}
/**
* Accessor for the step executions.
*
* @return the step executions that were registered
*/
public Collection<StepExecutionEvent> getStepExecutions() {
return Collections.unmodifiableList(new ArrayList<StepExecutionEvent>(stepExecutions));
}
/**
* Test if this {@link JobExecution} indicates that it is running. It should
* be noted that this does not necessarily mean that it has been persisted
* as such yet.
* @return true if the end time is null
*/
public boolean isRunning() {
return endTime == null;
}
/**
* Test if this {@link JobExecution} indicates that it has been signalled to
* stop.
* @return true if the status is {@link BatchStatus#STOPPING}
*/
public boolean isStopping() {
return status == BatchStatus.STOPPING;
}
/**
* Signal the {@link JobExecution} to stop. Iterates through the associated
* {@link StepExecution}s, calling {@link StepExecution#setTerminateOnly()}.
*
*/
public void stop() {
for (StepExecutionEvent stepExecution : stepExecutions) {
stepExecution.setTerminateOnly();
}
status = BatchStatus.STOPPING;
}
/**
* Sets the {@link ExecutionContext} for this execution
*
* @param executionContext the context
*/
public void setExecutionContext(ExecutionContext executionContext) {
this.executionContext = executionContext;
}
/**
* Returns the {@link ExecutionContext} for this execution. The content is
* expected to be persisted after each step completion (successful or not).
*
* @return the context
*/
public ExecutionContext getExecutionContext() {
return executionContext;
}
/**
* @return the time when this execution was created.
*/
public Date getCreateTime() {
return createTime;
}
/**
* @param createTime creation time of this execution.
*/
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public String getJobConfigurationName() {
return this.jobConfigurationName;
}
/**
* Package private method for re-constituting the step executions from
* existing instances.
* @param stepExecution
*/
void addStepExecution(StepExecutionEvent stepExecution) {
stepExecutions.add(stepExecution);
}
/**
* Get the date representing the last time this JobExecution was updated in
* the JobRepository.
*
* @return Date representing the last time this JobExecution was updated.
*/
public Date getLastUpdated() {
return lastUpdated;
}
/**
* Set the last time this JobExecution was updated.
*
* @param lastUpdated
*/
public void setLastUpdated(Date lastUpdated) {
this.lastUpdated = lastUpdated;
}
public List<Throwable> getFailureExceptions() {
return failureExceptions;
}
/**
* Add the provided throwable to the failure exception list.
*
* @param t
*/
public synchronized void addFailureException(Throwable t) {
this.failureExceptions.add(t);
}
/**
* Return all failure causing exceptions for this JobExecution, including
* step executions.
*
* @return List&lt;Throwable&gt; containing all exceptions causing failure for
* this JobExecution.
*/
public synchronized List<Throwable> getAllFailureExceptions() {
Set<Throwable> allExceptions = new HashSet<Throwable>(failureExceptions);
for (StepExecutionEvent stepExecution : stepExecutions) {
allExceptions.addAll(stepExecution.getFailureExceptions());
}
return new ArrayList<Throwable>(allExceptions);
}
/**
* Deserialize and ensure transient fields are re-instantiated when read
* back
*/
private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
stream.defaultReadObject();
failureExceptions = new ArrayList<Throwable>();
}
/*
* (non-Javadoc)
*
* @see org.springframework.batch.core.domain.Entity#toString()
*/
@Override
public String toString() {
return super.toString()
+ String.format(", startTime=%s, endTime=%s, lastUpdated=%s, status=%s, exitStatus=%s, job=[%s], jobParameters=[%s]",
startTime, endTime, lastUpdated, status, exitStatus, jobInstance, jobParameters);
}
/**
* Add some step executions. For internal use only.
* @param stepExecutions step executions to add to the current list
*/
public void addStepExecutions(List<StepExecutionEvent> stepExecutions) {
if (stepExecutions!=null) {
this.stepExecutions.removeAll(stepExecutions);
this.stepExecutions.addAll(stepExecutions);
}
}
}

View File

@@ -0,0 +1,61 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener.support;
import org.springframework.batch.core.Entity;
import org.springframework.util.Assert;
/**
* This is a JobInstance DTO created so that a {@link org.springframework.batch.core.JobInstance} can be serialized into
* Json without having to add mixins to an ObjectMapper.
*
* @author Glenn Renfro
*/
public class JobInstanceEvent extends Entity implements javax.batch.runtime.JobInstance {
private String jobName;
public JobInstanceEvent() {
}
public JobInstanceEvent(Long id, String jobName) {
super(id);
Assert.hasLength(jobName);
this.jobName = jobName;
}
/**
* @return the job name. (Equivalent to getJob().getName())
*/
@Override
public String getJobName() {
return jobName;
}
@Override
public String toString() {
return super.toString() + ", Job=[" + jobName + "]";
}
@Override
public long getInstanceId() {
return super.getId();
}
}

View File

@@ -0,0 +1,179 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener.support;
import java.util.*;
/**
* This is a JobParameter DTO created so that a {@link org.springframework.batch.core.JobParameter} can be serialized
* into Json without having to add mixins to an ObjectMapper.
* @author Glenn Renfro
*/
public class JobParameterEvent {
private Object parameter;
private JobParameterEvent.ParameterType parameterType;
private boolean identifying;
public JobParameterEvent() {
}
/**
* Construct a new JobParameter as a String.
*/
public JobParameterEvent(String parameter, boolean identifying) {
this.parameter = parameter;
parameterType = JobParameterEvent.ParameterType.STRING;
this.identifying = identifying;
}
/**
* Construct a new JobParameter as a Long.
*
* @param parameter
*/
public JobParameterEvent(Long parameter, boolean identifying) {
this.parameter = parameter;
parameterType = JobParameterEvent.ParameterType.LONG;
this.identifying = identifying;
}
/**
* Construct a new JobParameter as a Date.
*
* @param parameter
*/
public JobParameterEvent(Date parameter, boolean identifying) {
this.parameter = parameter;
parameterType = JobParameterEvent.ParameterType.DATE;
this.identifying = identifying;
}
/**
* Construct a new JobParameter as a Double.
*
* @param parameter
*/
public JobParameterEvent(Double parameter, boolean identifying) {
this.parameter = parameter;
parameterType = JobParameterEvent.ParameterType.DOUBLE;
this.identifying = identifying;
}
/**
* Construct a new JobParameter as a String.
*/
public JobParameterEvent(String parameter) {
this.parameter = parameter;
parameterType = JobParameterEvent.ParameterType.STRING;
this.identifying = true;
}
/**
* Construct a new JobParameter as a Long.
*
* @param parameter
*/
public JobParameterEvent(Long parameter) {
this.parameter = parameter;
parameterType = JobParameterEvent.ParameterType.LONG;
this.identifying = true;
}
/**
* Construct a new JobParameter as a Date.
*
* @param parameter
*/
public JobParameterEvent(Date parameter) {
this.parameter = parameter;
parameterType = JobParameterEvent.ParameterType.DATE;
this.identifying = true;
}
/**
* Construct a new JobParameter as a Double.
*
* @param parameter
*/
public JobParameterEvent(Double parameter) {
this.parameter = parameter;
parameterType = JobParameterEvent.ParameterType.DOUBLE;
this.identifying = true;
}
public boolean isIdentifying() {
return identifying;
}
/**
* @return the value contained within this JobParameter.
*/
public Object getValue() {
if (parameter != null && parameter.getClass().isInstance(Date.class)) {
return new Date(((Date) parameter).getTime());
}
else {
return parameter;
}
}
/**
* @return a ParameterType representing the type of this parameter.
*/
public JobParameterEvent.ParameterType getType() {
return parameterType;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof JobParameterEvent == false) {
return false;
}
if (this == obj) {
return true;
}
JobParameterEvent rhs = (JobParameterEvent) obj;
return parameter==null ? rhs.parameter==null && parameterType==rhs.parameterType: parameter.equals(rhs.parameter);
}
@Override
public String toString() {
return parameter == null ? null : (parameterType == JobParameterEvent.ParameterType.DATE ? "" + ((Date) parameter).getTime()
: parameter.toString());
}
@Override
public int hashCode() {
return 7 + 21 * (parameter == null ? parameterType.hashCode() : parameter.hashCode());
}
/**
* Enumeration representing the type of a JobParameter.
*/
public enum ParameterType {
STRING, DATE, LONG, DOUBLE;
}
}

View File

@@ -0,0 +1,227 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener.support;
import java.util.*;
import org.springframework.batch.core.JobParameter;
/**
* This is a JobParametersEvent DTO created so that a {@link org.springframework.batch.core.JobParameters} can be
* serialized into Json without having to add mixins to an ObjectMapper.
*
* @author Glenn Renfro
*/
public class JobParametersEvent {
private final Map<String,JobParameterEvent> parameters;
public JobParametersEvent() {
this.parameters = new LinkedHashMap<String, JobParameterEvent>();
}
public JobParametersEvent(Map<String,JobParameter> jobParameters) {
this.parameters = new LinkedHashMap<String,JobParameterEvent>();
for(Map.Entry<String, JobParameter> entry: jobParameters.entrySet()){
if(entry.getValue().getValue() instanceof String){
parameters.put(entry.getKey(), new JobParameterEvent(((String) entry.getValue().getValue()), entry.getValue().isIdentifying()));
}
else if(entry.getValue().getValue() instanceof Long){
parameters.put(entry.getKey(), new JobParameterEvent(((Long) entry.getValue().getValue()), entry.getValue().isIdentifying()));
}
else if(entry.getValue().getValue() instanceof Date){
parameters.put(entry.getKey(), new JobParameterEvent(((Date) entry.getValue().getValue()), entry.getValue().isIdentifying()));
}
else if(entry.getValue().getValue() instanceof Double){
parameters.put(entry.getKey(), new JobParameterEvent(((Double) entry.getValue().getValue()), entry.getValue().isIdentifying()));
}
}
}
/**
* Typesafe Getter for the Long represented by the provided key.
*
* @param key The key to get a value for
* @return The <code>Long</code> value
*/
public Long getLong(String key){
if (!parameters.containsKey(key)) {
return 0L;
}
Object value = parameters.get(key).getValue();
return value==null ? 0L : ((Long)value).longValue();
}
/**
* Typesafe Getter for the Long represented by the provided key. If the
* key does not exist, the default value will be returned.
*
* @param key to return the value for
* @param defaultValue to return if the value doesn't exist
* @return the parameter represented by the provided key, defaultValue
* otherwise.
*/
public Long getLong(String key, long defaultValue){
if(parameters.containsKey(key)){
return getLong(key);
}
else{
return defaultValue;
}
}
/**
* Typesafe Getter for the String represented by the provided key.
*
* @param key The key to get a value for
* @return The <code>String</code> value
*/
public String getString(String key){
JobParameterEvent value = parameters.get(key);
return value==null ? null : value.toString();
}
/**
* Typesafe Getter for the String represented by the provided key. If the
* key does not exist, the default value will be returned.
*
* @param key to return the value for
* @param defaultValue to return if the value doesn't exist
* @return the parameter represented by the provided key, defaultValue
* otherwise.
*/
public String getString(String key, String defaultValue){
if(parameters.containsKey(key)){
return getString(key);
}
else{
return defaultValue;
}
}
/**
* Typesafe Getter for the Long represented by the provided key.
*
* @param key The key to get a value for
* @return The <code>Double</code> value
*/
public Double getDouble(String key){
if (!parameters.containsKey(key)) {
return 0.0;
}
Double value = (Double)parameters.get(key).getValue();
return value==null ? 0.0 : value.doubleValue();
}
/**
* Typesafe Getter for the Double represented by the provided key. If the
* key does not exist, the default value will be returned.
*
* @param key to return the value for
* @param defaultValue to return if the value doesn't exist
* @return the parameter represented by the provided key, defaultValue
* otherwise.
*/
public Double getDouble(String key, double defaultValue){
if(parameters.containsKey(key)){
return getDouble(key);
}
else{
return defaultValue;
}
}
/**
* Typesafe Getter for the Date represented by the provided key.
*
* @param key The key to get a value for
* @return The <code>java.util.Date</code> value
*/
public Date getDate(String key){
return this.getDate(key,null);
}
/**
* Typesafe Getter for the Date represented by the provided key. If the
* key does not exist, the default value will be returned.
*
* @param key to return the value for
* @param defaultValue to return if the value doesn't exist
* @return the parameter represented by the provided key, defaultValue
* otherwise.
*/
public Date getDate(String key, Date defaultValue){
if(parameters.containsKey(key)){
return (Date)parameters.get(key).getValue();
}
else{
return defaultValue;
}
}
/**
* Get a map of all parameters, including string, long, and date.
*
* @return an unmodifiable map containing all parameters.
*/
public Map<String, JobParameterEvent> getParameters(){
return new LinkedHashMap<String, JobParameterEvent>(parameters);
}
/**
* @return true if the parameters is empty, false otherwise.
*/
public boolean isEmpty(){
return parameters.isEmpty();
}
@Override
public boolean equals(Object obj) {
if(obj instanceof JobParametersEvent == false){
return false;
}
if(obj == this){
return true;
}
JobParametersEvent rhs = (JobParametersEvent)obj;
return this.parameters.equals(rhs.parameters);
}
@Override
public int hashCode() {
return 17 + 23 * parameters.hashCode();
}
@Override
public String toString() {
return parameters.toString();
}
public Properties toProperties() {
Properties props = new Properties();
for (Map.Entry<String, JobParameterEvent> param : parameters.entrySet()) {
if(param.getValue() != null) {
props.put(param.getKey(), param.getValue().toString());
}
}
return props;
}
}

View File

@@ -0,0 +1,55 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener.support;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
/**
* Utility class that sends batch job listener payloads to the notification channel.
* @author Glenn Renfro
*/
public class MessagePublisher<P>{
private final MessageChannel listenerEventsChannel;
public MessagePublisher(MessageChannel listenerEventsChannel) {
Assert.notNull(listenerEventsChannel, "listenerEventsChannel must not be null");
this.listenerEventsChannel = listenerEventsChannel;
}
public final void publish(P payload) {
if (payload instanceof Message) {
this.publishMessage((Message<?>) payload);
}
else {
Message<P> message = MessageBuilder.withPayload(payload).build();
this.listenerEventsChannel.send(message);
}
}
private final void publishMessage(Message<?> message) {
this.listenerEventsChannel.send(message);
}
public void publishWithThrowableHeader(P payload, String header) {
Message<P> message = MessageBuilder.withPayload(payload).setHeader(BatchJobHeaders.BATCH_EXCEPTION,
header).build();
publishMessage(message);
}
}

View File

@@ -0,0 +1,476 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener.support;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Entity;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.util.Assert;
/**
* This is a StepExecution DTO created so that a {@link org.springframework.batch.core.StepExecution} can be serialized
* into Json without having to add mixins to an ObjectMapper.
* @author Glenn Renfro
*/
public class StepExecutionEvent extends Entity {
private volatile long jobExecutionId;
private volatile String stepName;
private volatile BatchStatus status = BatchStatus.STARTING;
private volatile int readCount = 0;
private volatile int writeCount = 0;
private volatile int commitCount = 0;
private volatile int rollbackCount = 0;
private volatile int readSkipCount = 0;
private volatile int processSkipCount = 0;
private volatile int writeSkipCount = 0;
private volatile Date startTime = new Date(System.currentTimeMillis());
private volatile Date endTime = null;
private volatile Date lastUpdated = null;
private volatile ExecutionContext executionContext = new ExecutionContext();
private volatile ExitStatus exitStatus = new ExitStatus("EXECUTING", null);
private volatile boolean terminateOnly;
private volatile int filterCount;
private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<Throwable>();
public StepExecutionEvent() {
super();
}
/**
* Constructor for the StepExecution to initialize the DTO.
*
* @param stepExecution the StepExecution to build this DTO around.
*/
public StepExecutionEvent(StepExecution stepExecution) {
super();
Assert.notNull(stepExecution, "StepExecution must be provided to re-hydrate an existing StepExecutionEvent");
Assert.notNull(stepExecution.getJobExecution(), "JobExecution must be provided to re-hydrate an existing StepExecutionEvent");
setId(stepExecution.getId());
this.jobExecutionId = stepExecution.getJobExecutionId();
this.stepName = stepExecution.getStepName();
this.status = stepExecution.getStatus();
this.exitStatus = new ExitStatus(stepExecution.getExitStatus().getExitCode(),
stepExecution.getExitStatus().getExitDescription());
this.executionContext = stepExecution.getExecutionContext();
for (Throwable throwable : stepExecution.getFailureExceptions()){
this.failureExceptions.add(throwable);
}
this.terminateOnly = stepExecution.isTerminateOnly();
this.endTime = stepExecution.getEndTime();
this.lastUpdated = stepExecution.getLastUpdated();
this.startTime = stepExecution.getStartTime();
this.commitCount = stepExecution.getCommitCount();
this.filterCount = stepExecution.getFilterCount();
this.processSkipCount = stepExecution.getProcessSkipCount();
this.readCount = stepExecution.getReadCount();
this.readSkipCount = stepExecution.getReadSkipCount();
this.rollbackCount = stepExecution.getRollbackCount();
this.writeCount = stepExecution.getWriteCount();
this.writeSkipCount = stepExecution.getWriteSkipCount();
}
/**
* Returns the {@link ExecutionContext} for this execution
*
* @return the attributes
*/
public ExecutionContext getExecutionContext() {
return executionContext;
}
/**
* Sets the {@link ExecutionContext} for this execution
*
* @param executionContext the attributes
*/
public void setExecutionContext(ExecutionContext executionContext) {
this.executionContext = executionContext;
}
/**
* Returns the current number of commits for this execution
*
* @return the current number of commits
*/
public int getCommitCount() {
return commitCount;
}
/**
* Sets the current number of commits for this execution
*
* @param commitCount the current number of commits
*/
public void setCommitCount(int commitCount) {
this.commitCount = commitCount;
}
/**
* Returns the time that this execution ended
*
* @return the time that this execution ended
*/
public Date getEndTime() {
return endTime;
}
/**
* Sets the time that this execution ended
*
* @param endTime the time that this execution ended
*/
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
/**
* Returns the current number of items read for this execution
*
* @return the current number of items read for this execution
*/
public int getReadCount() {
return readCount;
}
/**
* Sets the current number of read items for this execution
*
* @param readCount the current number of read items for this execution
*/
public void setReadCount(int readCount) {
this.readCount = readCount;
}
/**
* Returns the current number of items written for this execution
*
* @return the current number of items written for this execution
*/
public int getWriteCount() {
return writeCount;
}
/**
* Sets the current number of written items for this execution
*
* @param writeCount the current number of written items for this execution
*/
public void setWriteCount(int writeCount) {
this.writeCount = writeCount;
}
/**
* Returns the current number of rollbacks for this execution
*
* @return the current number of rollbacks for this execution
*/
public int getRollbackCount() {
return rollbackCount;
}
/**
* Returns the current number of items filtered out of this execution
*
* @return the current number of items filtered out of this execution
*/
public int getFilterCount() {
return filterCount;
}
/**
* Public setter for the number of items filtered out of this execution.
* @param filterCount the number of items filtered out of this execution to
* set
*/
public void setFilterCount(int filterCount) {
this.filterCount = filterCount;
}
/**
* Setter for number of rollbacks for this execution
*/
public void setRollbackCount(int rollbackCount) {
this.rollbackCount = rollbackCount;
}
/**
* Gets the time this execution started
*
* @return the time this execution started
*/
public Date getStartTime() {
return startTime;
}
/**
* Sets the time this execution started
*
* @param startTime the time this execution started
*/
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
/**
* Returns the current status of this step
*
* @return the current status of this step
*/
public BatchStatus getStatus() {
return status;
}
/**
* Sets the current status of this step
*
* @param status the current status of this step
*/
public void setStatus(BatchStatus status) {
this.status = status;
}
/**
* Upgrade the status field if the provided value is greater than the
* existing one. Clients using this method to set the status can be sure
* that they don't overwrite a failed status with an successful one.
*
* @param status the new status value
*/
public void upgradeStatus(BatchStatus status) {
this.status = this.status.upgradeTo(status);
}
public void setStepName(String stepName) {
this.stepName = stepName;
}
/**
* @return the name of the step
*/
public String getStepName() {
return stepName;
}
/**
* @param exitStatus
*/
public void setExitStatus(ExitStatus exitStatus) {
this.exitStatus = exitStatus;
}
/**
* @return the exitCode
*/
public ExitStatus getExitStatus() {
return exitStatus;
}
/**
* On unsuccessful execution after a chunk has rolled back.
*/
public synchronized void incrementRollbackCount() {
rollbackCount++;
}
/**
* @return flag to indicate that an execution should halt
*/
public boolean isTerminateOnly() {
return this.terminateOnly;
}
/**
* Set a flag that will signal to an execution environment that this
* execution (and its surrounding job) wishes to exit.
*/
public void setTerminateOnly() {
this.terminateOnly = true;
}
/**
* @return the total number of items skipped.
*/
public int getSkipCount() {
return readSkipCount + processSkipCount + writeSkipCount;
}
/**
* Increment the number of commits
*/
public void incrementCommitCount() {
commitCount++;
}
/**
* @return the number of records skipped on read
*/
public int getReadSkipCount() {
return readSkipCount;
}
/**
* @return the number of records skipped on write
*/
public int getWriteSkipCount() {
return writeSkipCount;
}
/**
* Set the number of records skipped on read
*
* @param readSkipCount
*/
public void setReadSkipCount(int readSkipCount) {
this.readSkipCount = readSkipCount;
}
/**
* Set the number of records skipped on write
*
* @param writeSkipCount
*/
public void setWriteSkipCount(int writeSkipCount) {
this.writeSkipCount = writeSkipCount;
}
/**
* @return the number of records skipped during processing
*/
public int getProcessSkipCount() {
return processSkipCount;
}
/**
* Set the number of records skipped during processing.
*
* @param processSkipCount
*/
public void setProcessSkipCount(int processSkipCount) {
this.processSkipCount = processSkipCount;
}
/**
* @return the Date representing the last time this execution was persisted.
*/
public Date getLastUpdated() {
return lastUpdated;
}
/**
* Set the time when the StepExecution was last updated before persisting
*
* @param lastUpdated
*/
public void setLastUpdated(Date lastUpdated) {
this.lastUpdated = lastUpdated;
}
public List<Throwable> getFailureExceptions() {
return failureExceptions;
}
public void addFailureException(Throwable throwable) {
this.failureExceptions.add(throwable);
}
public long getJobExecutionId() {
return jobExecutionId;
}
/*
* (non-Javadoc)
*
* @see
* org.springframework.batch.container.common.domain.Entity#equals(java.
* lang.Object)
*/
@Override
public boolean equals(Object obj) {
if ( !(obj instanceof StepExecution) || getId() == null) {
return super.equals(obj);
}
StepExecution other = (StepExecution) obj;
return stepName.equals(other.getStepName()) && (jobExecutionId == other.getJobExecutionId())
&& getId().equals(other.getId());
}
/**
* Deserialize and ensure transient fields are re-instantiated when read
* back
*/
private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
stream.defaultReadObject();
failureExceptions = new ArrayList<Throwable>();
}
/*
* (non-Javadoc)
*
* @see org.springframework.batch.container.common.domain.Entity#hashCode()
*/
@Override
public int hashCode() {
Object jobExecutionId = getJobExecutionId();
Long id = getId();
return super.hashCode() + 31 * (stepName != null ? stepName.hashCode() : 0) + 91
* (jobExecutionId != null ? jobExecutionId.hashCode() : 0) + 59 * (id != null ? id.hashCode() : 0);
}
@Override
public String toString() {
return String.format(getSummary() + ", exitDescription=%s", exitStatus.getExitDescription());
}
public String getSummary() {
return super.toString()
+ String.format(
", name=%s, status=%s, exitStatus=%s, readCount=%d, filterCount=%d, writeCount=%d readSkipCount=%d, writeSkipCount=%d"
+ ", processSkipCount=%d, commitCount=%d, rollbackCount=%d", stepName, status,
exitStatus.getExitCode(), readCount, filterCount, writeCount, readSkipCount, writeSkipCount,
processSkipCount, commitCount, rollbackCount);
}
}

View File

@@ -0,0 +1,114 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener.support;
import java.lang.reflect.*;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.job.AbstractJob;
import org.springframework.batch.core.step.AbstractStep;
import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
import org.springframework.batch.core.step.item.SimpleChunkProvider;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.util.ReflectionUtils;
/**
* Attaches the listeners to the job and its steps.
* Based on the type of bean that is being processed will determine what listener is attached.
* <p>
* <ul>
* <li>If the bean is of type AbstactJob then the JobExecutionListener is registered with this bean.</li>
* <li>If the bean is of type AbstactStep then the StepExecutionListener is registered with this bean.</li>
* <li>If the bean is of type TaskletStep then the ChunkEventListener is registered with this bean.</li>
* <li>If the tasklet for the TaskletStep is of type ChunkOrientedTasklet the following listeners will be registered. </li>
* <ul>
* <li>ItemReadListener with the ChunkProvider.</li>
* <li>ItemProcessListener with the ChunkProcessor.</li>
* <li>ItemWriteEventsListener with the ChunkProcessor.</li>
* <li>SkipEventsListener with the ChunkProcessor.</li>
* </ul>
* </ul>
* </p>
* @author Michael Minella
* @author Glenn Renfro
*/
public class TaskBatchEventListenerBeanPostProcessor implements BeanPostProcessor {
@Autowired
private ApplicationContext applicationContext;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AbstractJob) {
JobExecutionListener jobExecutionEventsListener =
(JobExecutionListener) this.applicationContext.getBean(
BatchEventAutoConfiguration.JOB_EXECUTION_EVENTS_LISTENER);
AbstractJob job = (AbstractJob) bean;
job.registerJobExecutionListener(
jobExecutionEventsListener);
}
if (bean instanceof AbstractStep) {
StepExecutionListener stepExecutionListener =
(StepExecutionListener) this.applicationContext.getBean(BatchEventAutoConfiguration.STEP_EXECUTION_EVENTS_LISTENER);
AbstractStep step = (AbstractStep) bean;
step.registerStepExecutionListener(stepExecutionListener);
if (bean instanceof TaskletStep) {
TaskletStep taskletStep = (TaskletStep) bean;
taskletStep.registerChunkListener((ChunkListener) this.applicationContext.getBean(BatchEventAutoConfiguration.CHUNK_EVENTS_LISTENER));
Tasklet tasklet = taskletStep.getTasklet();
if (tasklet instanceof ChunkOrientedTasklet) {
Field chunkProviderField = ReflectionUtils.findField(ChunkOrientedTasklet.class, "chunkProvider");
ReflectionUtils.makeAccessible(chunkProviderField);
SimpleChunkProvider chunkProvider = (SimpleChunkProvider) ReflectionUtils.getField(chunkProviderField, tasklet);
Field chunkProcessorField = ReflectionUtils.findField(ChunkOrientedTasklet.class, "chunkProcessor");
ReflectionUtils.makeAccessible(chunkProcessorField);
SimpleChunkProcessor chunkProcessor = (SimpleChunkProcessor) ReflectionUtils.getField(chunkProcessorField, tasklet);
chunkProvider.registerListener((ItemReadListener) this.applicationContext.getBean(BatchEventAutoConfiguration.ITEM_READ_EVENTS_LISTENER));
chunkProvider.registerListener((SkipListener) this.applicationContext.getBean(BatchEventAutoConfiguration.SKIP_EVENTS_LISTENER));
chunkProcessor.registerListener((ItemProcessListener) this.applicationContext.getBean(BatchEventAutoConfiguration.ITEM_PROCESS_EVENTS_LISTENER));
chunkProcessor.registerListener((ItemWriteListener) this.applicationContext.getBean(BatchEventAutoConfiguration.ITEM_WRITE_EVENTS_LISTENER));
chunkProcessor.registerListener((SkipListener) this.applicationContext.getBean(BatchEventAutoConfiguration.SKIP_EVENTS_LISTENER));
}
}
}
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
}

View File

@@ -1 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.task.listener.TaskEventAutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration

View File

@@ -1 +1,9 @@
spring.cloud.stream.bindings.task-events.contentType=application/json
spring.cloud.stream.bindings.item-write-events.contentType=application/json
spring.cloud.stream.bindings.item-read-events.contentType=application/json
spring.cloud.stream.bindings.item-process-events.contentType=application/json
spring.cloud.stream.bindings.skip-events.contentType=application/json
spring.cloud.stream.bindings.step-execution-events.contentType=application/json
spring.cloud.stream.bindings.job-execution-events.contentType=application/json

View File

@@ -0,0 +1,111 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener;
import java.util.*;
import org.junit.Before;
import org.junit.Test;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent;
import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* @author Glenn Renfro.
*/
public class EventJobExecutionTests {
private static final String JOB_NAME = "FOODJOB";
private static final Long JOB_INSTANCE_ID = 1l;
private static final Long JOB_EXECUTION_ID = 2l;
private static final String JOB_CONFIGURATION_NAME = "FOO_JOB_CONFIG";
private JobParameters jobParameters;
private JobInstance jobInstance;
@Before
public void setup() {
jobInstance = new JobInstance(JOB_INSTANCE_ID, JOB_NAME);
jobParameters = new JobParameters();
}
@Test
public void testBasic() {
JobExecution jobExecution = new JobExecution(jobInstance, JOB_EXECUTION_ID, jobParameters, JOB_CONFIGURATION_NAME);
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(jobExecution);
assertNotNull("jobInstance should not be null", jobExecutionEvent.getJobInstance());
assertNotNull("jobParameters should not be null", jobExecutionEvent.getJobParameters());
assertEquals("jobConfigurationName did not match expected", JOB_CONFIGURATION_NAME,
jobExecutionEvent.getJobConfigurationName());
assertEquals("jobParameters size did not match", 0, jobExecutionEvent.getJobParameters().getParameters().size());
assertEquals("jobInstance name did not match", JOB_NAME, jobExecutionEvent.getJobInstance().getJobName());
assertEquals("no step executions were expected", 0, jobExecutionEvent.getStepExecutions().size());
assertEquals("exitStatus did not match expected", "UNKNOWN", jobExecutionEvent.getExitStatus().getExitCode());
}
@Test
public void testJobParameters() {
String[] JOB_PARAM_KEYS = { "A", "B", "C", "D" };
Date testDate = new Date();
JobParameter[] PARAMETERS = { new JobParameter("FOO", true), new JobParameter(1L, true),
new JobParameter(1D, true), new JobParameter(testDate, false) };
Map jobParamMap = new LinkedHashMap<String, JobParameter>();
for (int paramCount = 0; paramCount < JOB_PARAM_KEYS.length; paramCount++) {
jobParamMap.put(JOB_PARAM_KEYS[paramCount], PARAMETERS[paramCount]);
}
jobParameters = new JobParameters(jobParamMap);
JobExecution jobExecution = new JobExecution(jobInstance, JOB_EXECUTION_ID, jobParameters, JOB_CONFIGURATION_NAME);
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(jobExecution);
assertNotNull("Job Parameter A was expected", jobExecutionEvent.getJobParameters().getString("A"));
assertNotNull("Job Parameter B was expected", jobExecutionEvent.getJobParameters().getLong("B"));
assertNotNull("Job Parameter C was expected", jobExecutionEvent.getJobParameters().getDouble("C"));
assertNotNull("Job Parameter D was expected", jobExecutionEvent.getJobParameters().getDate("D"));
assertEquals("Job Parameter A value was not correct", "FOO", jobExecutionEvent.getJobParameters().getString("A"));
assertEquals("Job Parameter B value was not correct", new Long(1), jobExecutionEvent.getJobParameters().getLong("B"));
assertEquals("Job Parameter C value was not correct", new Double(1), jobExecutionEvent.getJobParameters().getDouble("C"));
assertEquals("Job Parameter D value was not correct", testDate, jobExecutionEvent.getJobParameters().getDate("D"));
}
@Test
public void testStepExecutions() {
JobExecution jobExecution = new JobExecution(jobInstance, JOB_EXECUTION_ID, jobParameters, JOB_CONFIGURATION_NAME);
List<StepExecution> stepsExecutions = new ArrayList<>();
stepsExecutions.add( new StepExecution("foo", jobExecution));
stepsExecutions.add( new StepExecution("bar", jobExecution));
stepsExecutions.add( new StepExecution("baz", jobExecution));
jobExecution.addStepExecutions(stepsExecutions);
JobExecutionEvent jobExecutionsEvent = new JobExecutionEvent(jobExecution);
assertEquals("stepExecutions count is incorrect", 3, jobExecutionsEvent.getStepExecutions().size());
Iterator<StepExecutionEvent> iter = jobExecutionsEvent.getStepExecutions().iterator();
assertEquals("foo stepExecution is not present", "foo", iter.next().getStepName());
assertEquals("bar stepExecution is not present", "bar", iter.next().getStepName());
assertEquals("baz stepExecution is not present", "baz", iter.next().getStepName());
}
}

View File

@@ -0,0 +1,63 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.task.batch.listener;
import org.junit.Test;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent;
import static org.junit.Assert.assertEquals;
/**
* @author Glenn Renfro
*/
public class EventStepExecutionTests {
private static final String JOB_NAME = "FOO_JOB";
private static final String STEP_NAME = "STEP_NAME";
private static final Long JOB_INSTANCE_ID = 1l;
private static final Long JOB_EXECUTION_ID = 2l;
private static final String JOB_CONFIGURATION_NAME = "FOO_JOB_CONFIG";
@Test
public void testBasic(){
JobInstance jobInstance = new JobInstance(JOB_INSTANCE_ID, JOB_NAME);
JobParameters jobParameters = new JobParameters();
JobExecution jobExecution = new JobExecution(jobInstance, JOB_EXECUTION_ID, jobParameters, JOB_CONFIGURATION_NAME);
StepExecution stepExecution = new StepExecution(STEP_NAME, jobExecution);
stepExecution.setCommitCount(1);
stepExecution.setReadCount(2);
stepExecution.setWriteCount(3);
stepExecution.setReadSkipCount(4);
stepExecution.setWriteSkipCount(5);
StepExecutionEvent stepExecutionEvent = new StepExecutionEvent(stepExecution);
assertEquals("stepName result was not as expected", STEP_NAME, stepExecutionEvent.getStepName());
assertEquals("startTime result was not as expected", stepExecution.getStartTime(), stepExecutionEvent.getStartTime());
assertEquals("endTime result was not as expected", stepExecution.getEndTime(), stepExecutionEvent.getEndTime());
assertEquals("lastUpdated result was not as expected", stepExecution.getLastUpdated(), stepExecutionEvent.getLastUpdated());
assertEquals("commitCount result was not as expected", stepExecution.getCommitCount(), stepExecutionEvent.getCommitCount());
assertEquals("readCount result was not as expected", stepExecution.getReadCount(), stepExecutionEvent.getReadCount());
assertEquals("readSkipCount result was not as expected", stepExecution.getReadSkipCount(), stepExecutionEvent.getReadSkipCount());
assertEquals("writeCount result was not as expected", stepExecution.getWriteCount(), stepExecutionEvent.getWriteCount());
assertEquals("writeSkipCount result was not as expected", stepExecution.getWriteSkipCount(), stepExecutionEvent.getWriteSkipCount());
assertEquals("skipCount result was not as expected", stepExecution.getSkipCount(), stepExecutionEvent.getSkipCount());
}
}

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.task.launcher;
import java.util.HashMap;
import java.util.Map;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -27,6 +28,7 @@ import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.cloud.deployer.spi.task.LaunchState;
import org.springframework.cloud.stream.annotation.Bindings;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.cloud.task.launcher.configuration.TaskConfiguration;
import org.springframework.cloud.task.launcher.util.TaskLauncherSinkApplication;
import org.springframework.context.ApplicationContext;
@@ -39,6 +41,9 @@ import static org.junit.Assert.assertEquals;
@SpringApplicationConfiguration(classes = {TaskLauncherSinkApplication.class, TaskConfiguration.class} )
public class TaskLauncherSinkTests {
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
private final static String DEFAULT_STATUS = "test_status";
@Autowired

View File

@@ -15,11 +15,12 @@
*/
package org.springframework.cloud.task.listener;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
@@ -28,19 +29,24 @@ import static org.junit.Assert.assertNotNull;
/**
* @author Michael Minella
* @author Ilayaperumal Gopinathan
*/
public class TaskEventTests {
@Rule
public RedisTestSupport redisTestSupport = new RedisTestSupport();
private static final String TASK_NAME = "taskEventTest";
@Test
public void testDefaultConfiguration() {
ConfigurableApplicationContext applicationContext =
SpringApplication.run(new Object[] {TaskEventsConfiguration.class,
SpringApplication.run(new Object[]{ TaskEventsConfiguration.class,
TaskEventAutoConfiguration.class,
PropertyPlaceholderAutoConfiguration.class,
TestSupportBinderAutoConfiguration.class},
new String[] {"--spring.cloud.task.closecontext.enable=false",
PropertyPlaceholderAutoConfiguration.class },
new String[]{ "--spring.cloud.task.closecontext.enable=false",
"--spring.main.web-environment=false",
"--spring.cloud.stream.defaultBinder=test"});
"--spring.cloud.stream.defaultBinder=test" });
assertNotNull(applicationContext.getBean("taskEventListener"));
assertNotNull(applicationContext.getBean(TaskEventAutoConfiguration.TaskEventChannels.class));
@@ -50,4 +56,5 @@ public class TaskEventTests {
@EnableTask
public static class TaskEventsConfiguration {
}
}