diff --git a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/TaskExecution.java b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/TaskExecution.java index 000fa91e..3c1996a1 100644 --- a/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/TaskExecution.java +++ b/spring-cloud-task-core/src/main/java/org/springframework/cloud/task/repository/TaskExecution.java @@ -78,7 +78,7 @@ public class TaskExecution { this.exitCode = exitCode; this.taskName = taskName; this.exitMessage = exitMessage; - this.parameters = parameters; + this.parameters = new ArrayList<>(parameters); this.startTime = (Date)startTime.clone(); this.endTime = (endTime != null) ? (Date)endTime.clone() : null; } @@ -132,7 +132,7 @@ public class TaskExecution { } public void setParameters(List parameters) { - this.parameters = parameters; + this.parameters = new ArrayList<> (parameters); } @Override diff --git a/spring-cloud-task-integration-tests/pom.xml b/spring-cloud-task-integration-tests/pom.xml index 629ebd09..639c250e 100644 --- a/spring-cloud-task-integration-tests/pom.xml +++ b/spring-cloud-task-integration-tests/pom.xml @@ -36,5 +36,10 @@ 1.0.0.BUILD-SNAPSHOT test + + org.springframework.cloud + spring-cloud-task-batch + test + diff --git a/spring-cloud-task-integration-tests/src/test/java/configuration/JobConfiguration.java b/spring-cloud-task-integration-tests/src/test/java/configuration/JobConfiguration.java new file mode 100644 index 00000000..30def48e --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/java/configuration/JobConfiguration.java @@ -0,0 +1,89 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package configuration; + +import java.util.*; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.support.ListItemReader; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.task.configuration.EnableTask; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Glenn Renfro + */ +@Configuration +@EnableBatchProcessing +@EnableTask +public class JobConfiguration { + @Autowired + private JobBuilderFactory jobBuilderFactory; + + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Bean + public Job job() { + return jobBuilderFactory.get("job") + .start(step1()).next(step2()) + .build(); + } + + @Bean + public Step step1() { + return stepBuilderFactory.get("step1").tasklet(new Tasklet() { + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + System.out.println("Executed"); + return RepeatStatus.FINISHED; + } + }).build(); + } + + @Bean + public Step step2() { + return stepBuilderFactory.get("step2").chunk(3) + .reader(new ListItemReader<>(Arrays.asList("1", "2", "3", "4", "5", "6"))) + .processor(new ItemProcessor() { + @Override + public String process(Object item) throws Exception { + return String.valueOf(Integer.parseInt((String) item) * -1); + } + }) + .writer(new ItemWriter() { + @Override + public void write(List items) throws Exception { + for (Object item : items) { + System.out.println(">> " + item); + } + } + }).build(); + } + +} diff --git a/spring-cloud-task-integration-tests/src/test/java/configuration/JobSkipConfiguration.java b/spring-cloud-task-integration-tests/src/test/java/configuration/JobSkipConfiguration.java new file mode 100644 index 00000000..e1ba64be --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/java/configuration/JobSkipConfiguration.java @@ -0,0 +1,81 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package configuration; + +import java.util.*; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.task.configuration.EnableTask; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Glenn Renfro + */ +@Configuration +@EnableBatchProcessing +@EnableTask +public class JobSkipConfiguration { + @Autowired + private JobBuilderFactory jobBuilderFactory; + + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Bean + public Job job() { + return jobBuilderFactory.get("job") + .start(step1()).next(step2()) + .build(); + } + + @Bean + public Step step1() { + return stepBuilderFactory.get("step1").tasklet(new Tasklet() { + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + System.out.println("Executed"); + return RepeatStatus.FINISHED; + } + }).build(); + } + + @Bean + public Step step2() { + return stepBuilderFactory.get("step2").chunk(3).faultTolerant().skip(IllegalStateException.class).skipLimit(100) + .reader(new SkipItemReader()) + .processor(new ItemProcessor() { + @Override + public String process(Object item) throws Exception { + return String.valueOf(Integer.parseInt((String) item) * -1); + } + }) + .writer( new SkipItemWriter() ).build(); + } + +} diff --git a/spring-cloud-task-integration-tests/src/test/java/configuration/SkipItemReader.java b/spring-cloud-task-integration-tests/src/test/java/configuration/SkipItemReader.java new file mode 100644 index 00000000..097c2a99 --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/java/configuration/SkipItemReader.java @@ -0,0 +1,45 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package configuration; + +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.NonTransientResourceException; +import org.springframework.batch.item.ParseException; +import org.springframework.batch.item.UnexpectedInputException; + +/** + * @author Glenn Renfro + */ +public class SkipItemReader implements ItemReader{ + + int failCount = 0; + boolean finished = false; + + @Override + public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { + String result = "1"; + if(failCount < 2) { + failCount++; + throw new IllegalStateException("Reader FOOBAR"); + } + if (finished){ + result = null; + } + finished = true; + return result; + } +} diff --git a/spring-cloud-task-integration-tests/src/test/java/configuration/SkipItemWriter.java b/spring-cloud-task-integration-tests/src/test/java/configuration/SkipItemWriter.java new file mode 100644 index 00000000..cae5ce4f --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/java/configuration/SkipItemWriter.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package configuration; + +import java.util.*; + +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.NonTransientResourceException; +import org.springframework.batch.item.ParseException; +import org.springframework.batch.item.UnexpectedInputException; + +/** + * @author Glenn Renfro + */ +public class SkipItemWriter implements ItemWriter { + + int failCount = 0; + boolean finished = false; + + @Override + public void write(List items) throws Exception { + String result = "1"; + if(failCount < 2) { + failCount++; + throw new IllegalStateException("Writer FOOBAR"); + } + for (Object item : items) { + System.out.println(">> " + item); + } + } +} diff --git a/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/BatchExecutionEventTests.java b/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/BatchExecutionEventTests.java new file mode 100644 index 00000000..87b891ed --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/BatchExecutionEventTests.java @@ -0,0 +1,288 @@ + +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.listener; + +import java.util.concurrent.*; + +import configuration.JobConfiguration; +import configuration.JobSkipConfiguration; +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Test; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration; +import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.binder.redis.config.RedisServiceAutoConfiguration; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport; +import org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration; +import org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration; +import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent; +import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.PropertySource; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * @author Glenn Renfro + */ +public class BatchExecutionEventTests { + @ClassRule + public static RedisTestSupport redisTestSupport = new RedisTestSupport(); + + // Count for two job execution events per job + static CountDownLatch jobExecutionLatch = new CountDownLatch(2); + + // Count for four step execution events per job + static CountDownLatch stepExecutionLatch = new CountDownLatch(4); + static int stepOneCount = 0; + static int stepTwoCount = 0; + + // Count for twelve item process events per job + static CountDownLatch itemProcessLatch = new CountDownLatch(6); + + // Count for eight chunk events per job + static CountDownLatch chunkEventsLatch = new CountDownLatch(8); + + // Count for zero read events per job + static CountDownLatch itemReadEventsLatch = new CountDownLatch(0); + + // Count for six write events per job + static CountDownLatch itemWriteEventsLatch = new CountDownLatch(2); + + // Count for 3 skip events per job + static CountDownLatch skipEventsLatch = new CountDownLatch(3); + static int readSkipCount = 0; + static int writeSkipCount = 0; + + + private static final String TASK_NAME = "jobEventTest"; + + private ConfigurableApplicationContext applicationContext; + + @After + public void tearDown() { + if (applicationContext != null && applicationContext.isActive() ) { + applicationContext.close(); + } + } + + @Test + public void testContext() { + applicationContext = new SpringApplicationBuilder() + .sources(this.getConfigurations(BatchExecutionEventTests.ListenerBinding.class, JobConfiguration.class)) + .build().run(new String[]{ "--spring.cloud.task.closecontext.enable=false" }); + + assertNotNull(applicationContext.getBean("jobExecutionEventsListener")); + assertNotNull(applicationContext.getBean("stepExecutionEventsListener")); + assertNotNull(applicationContext.getBean("chunkEventsListener")); + assertNotNull(applicationContext.getBean("itemReadEventsListener")); + assertNotNull(applicationContext.getBean("itemWriteEventsListener")); + assertNotNull(applicationContext.getBean("itemProcessEventsListener")); + assertNotNull(applicationContext.getBean("skipEventsListener")); + assertNotNull(applicationContext.getBean(BatchEventAutoConfiguration.BatchEventsChannels.class)); + + } + + @Test + public void testJobEventListener() throws Exception { + testListener("--spring.cloud.stream.bindings.job-execution-events.destination=foobar", + jobExecutionLatch, BatchExecutionEventTests.ListenerBinding.class); + + } + + @Test + public void testStepEventListener() throws Exception { + testListener("--spring.cloud.stream.bindings.step-execution-events.destination=step-execution-foobar", + stepExecutionLatch, BatchExecutionEventTests.StepListenerBinding.class); + assertEquals("the number of step1 events did not match", 2, stepOneCount); + assertEquals("the number of step2 events did not match", 2, stepTwoCount); + + } + + @Test + public void testItemProcessEventListener() throws Exception { + testListener("--spring.cloud.stream.bindings.item-process-events.destination=item-process-foobar", + itemProcessLatch, BatchExecutionEventTests.ItemProcessListenerBinding.class); + } + + + @Test + public void testChunkListener() throws Exception { + testListener("--spring.cloud.stream.bindings.chunk-events.destination=chunk-events-foobar", + chunkEventsLatch, BatchExecutionEventTests.ChunkEventsListenerBinding.class); + } + + @Test + public void testItemReadListener() throws Exception { + testListener("--spring.cloud.stream.bindings.item-read-events.destination=item-read-events-foobar", + itemReadEventsLatch, BatchExecutionEventTests.ItemReadEventsListenerBinding.class); + } + + @Test + public void testWriteListener() throws Exception { + testListener("--spring.cloud.stream.bindings.item-write-events.destination=item-write-events-foobar", + itemWriteEventsLatch, BatchExecutionEventTests.ItemWriteEventsListenerBinding.class); + } + + @Test + public void testSkipEventListener() throws Exception { + testListenerSkip("--spring.cloud.stream.bindings.skip-events.destination=skip-event-foobar", + skipEventsLatch, BatchExecutionEventTests.SkipEventsListenerBinding.class); + assertEquals("read skip count did not match expected result", 2, readSkipCount); + assertEquals("write skip count did not match expected result", 1, writeSkipCount); + } + + @EnableBinding(Sink.class) + @PropertySource("classpath:/org/springframework/cloud/task/listener/job-execution-sink-channel.properties") + @EnableAutoConfiguration + public static class ListenerBinding { + + @StreamListener(Sink.INPUT) + public void receive(JobExecutionEvent execution) { + assertEquals(String.format("Job name should be job"), "job", execution.getJobInstance().getJobName()); + jobExecutionLatch.countDown(); + } + } + + @EnableBinding(Sink.class) + @PropertySource("classpath:/org/springframework/cloud/task/listener/step-execution-sink-channel.properties") + @EnableAutoConfiguration + public static class StepListenerBinding { + + @StreamListener(Sink.INPUT) + public void receive(StepExecutionEvent execution) { + if(execution.getStepName().equals("step1")) { + stepOneCount++; + } + if(execution.getStepName().equals("step2")) { + stepTwoCount++; + } + + stepExecutionLatch.countDown(); + } + } + + @EnableBinding(Sink.class) + @PropertySource("classpath:/org/springframework/cloud/task/listener/item-process-sink-channel.properties") + @EnableAutoConfiguration + public static class ItemProcessListenerBinding { + + @StreamListener(Sink.INPUT) + public void receive(Object object) { + itemProcessLatch.countDown(); + } + } + + @EnableBinding(Sink.class) + @PropertySource("classpath:/org/springframework/cloud/task/listener/chunk-events-sink-channel.properties") + @EnableAutoConfiguration + public static class ChunkEventsListenerBinding { + + @StreamListener(Sink.INPUT) + public void receive(Object chunkContext) { + chunkEventsLatch.countDown(); + } + } + + @EnableBinding(Sink.class) + @PropertySource("classpath:/org/springframework/cloud/task/listener/item-read-events-sink-channel.properties") + @EnableAutoConfiguration + public static class ItemReadEventsListenerBinding { + + @StreamListener(Sink.INPUT) + public void receive(Object itemRead) { + itemReadEventsLatch.countDown(); + } + } + + @EnableBinding(Sink.class) + @PropertySource("classpath:/org/springframework/cloud/task/listener/skip-events-sink-channel.properties") + @EnableAutoConfiguration + public static class SkipEventsListenerBinding { + private static final String SKIPPING_READ_MESSAGE = "Skipped when reading."; + private static final String SKIPPING_WRITE_CONTENT = "-1"; + @StreamListener(Sink.INPUT) + public void receive(Object exceptionMessage) { + if(exceptionMessage.toString().equals(SKIPPING_READ_MESSAGE)){ + readSkipCount++; + } + if(exceptionMessage.toString().equals(SKIPPING_WRITE_CONTENT)){ + writeSkipCount++; + } + skipEventsLatch.countDown(); + } + } + + @EnableBinding(Sink.class) + @PropertySource("classpath:/org/springframework/cloud/task/listener/item-write-events-sink-channel.properties") + @EnableAutoConfiguration + public static class ItemWriteEventsListenerBinding { + + @StreamListener(Sink.INPUT) + public void receive(Object itemWrite) { + assertTrue("Message should start with '3 items'", itemWrite.toString().startsWith("3 items ")); + assertTrue("Message should end with ' written.'", itemWrite.toString().endsWith(" written.")); + itemWriteEventsLatch.countDown(); + } + } + + private Object[] getConfigurations(Class sinkClazz, Class jobConfigurationClazz) { + return new Object[]{ + jobConfigurationClazz, + PropertyPlaceholderAutoConfiguration.class, + BatchAutoConfiguration.class, + TaskBatchAutoConfiguration.class, + TaskEventAutoConfiguration.class, + BatchEventAutoConfiguration.class, + RedisServiceAutoConfiguration.class, + sinkClazz }; + } + + private String[] getCommandLineParams(String sinkChannelParam) { + return new String[]{ "--spring.cloud.task.closecontext.enable=false", + "--spring.cloud.task.name=" + TASK_NAME, + "--spring.main.web-environment=false", + "--spring.cloud.stream.defaultBinder=redis", + "--spring.cloud.stream.bindings.task-events.destination=test", + sinkChannelParam }; + } + + private void testListener(String channelBinding, CountDownLatch latch, Class clazz) throws Exception{ + applicationContext = new SpringApplicationBuilder() + .sources(this.getConfigurations(clazz, JobConfiguration.class)) + .build().run(getCommandLineParams(channelBinding)); + + assertTrue(latch.await(1, TimeUnit.SECONDS)); + } + + private void testListenerSkip(String channelBinding, CountDownLatch latch, Class clazz) throws Exception{ + applicationContext = new SpringApplicationBuilder() + .sources(this.getConfigurations(clazz, JobSkipConfiguration.class)) + .build().run(getCommandLineParams(channelBinding)); + + assertTrue(latch.await(1, TimeUnit.SECONDS)); + } +} diff --git a/spring-cloud-task-integration-tests/src/test/resources/application.properties b/spring-cloud-task-integration-tests/src/test/resources/application.properties new file mode 100644 index 00000000..adfd50cc --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/resources/application.properties @@ -0,0 +1,3 @@ +logging.level.org.springframework.cloud.task=DEBUG +logging.level.org.springframework.cloud.stream=DEBUG +spring.application.name=batchEvents diff --git a/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/chunk-events-sink-channel.properties b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/chunk-events-sink-channel.properties new file mode 100644 index 00000000..aa4a821b --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/chunk-events-sink-channel.properties @@ -0,0 +1,17 @@ +# +# Copyright 2016 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spring.cloud.stream.bindings.input.destination=chunk-events-foobar diff --git a/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/item-process-sink-channel.properties b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/item-process-sink-channel.properties new file mode 100644 index 00000000..7cc85130 --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/item-process-sink-channel.properties @@ -0,0 +1,17 @@ +# +# Copyright 2016 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spring.cloud.stream.bindings.input.destination=item-process-foobar diff --git a/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/item-read-events-sink-channel.properties b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/item-read-events-sink-channel.properties new file mode 100644 index 00000000..95175e69 --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/item-read-events-sink-channel.properties @@ -0,0 +1,17 @@ +# +# Copyright 2016 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spring.cloud.stream.bindings.input.destination=item-read-events-foobar diff --git a/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/item-write-events-sink-channel.properties b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/item-write-events-sink-channel.properties new file mode 100644 index 00000000..c5906e36 --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/item-write-events-sink-channel.properties @@ -0,0 +1,17 @@ +# +# Copyright 2016 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spring.cloud.stream.bindings.input.destination=item-write-events-foobar diff --git a/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/job-execution-sink-channel.properties b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/job-execution-sink-channel.properties new file mode 100644 index 00000000..2ab32e3e --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/job-execution-sink-channel.properties @@ -0,0 +1,17 @@ +# +# Copyright 2016 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spring.cloud.stream.bindings.input.destination=foobar diff --git a/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/sink-channel.properties b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/sink-channel.properties index 00ef15a7..47e0200c 100644 --- a/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/sink-channel.properties +++ b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/sink-channel.properties @@ -1 +1,17 @@ +# +# Copyright 2016 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + spring.cloud.stream.bindings.input.destination=test diff --git a/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/skip-events-sink-channel.properties b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/skip-events-sink-channel.properties new file mode 100644 index 00000000..9d518149 --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/skip-events-sink-channel.properties @@ -0,0 +1,17 @@ +# +# Copyright 2016 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spring.cloud.stream.bindings.input.destination=skip-event-foobar diff --git a/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/step-execution-sink-channel.properties b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/step-execution-sink-channel.properties new file mode 100644 index 00000000..4afc659b --- /dev/null +++ b/spring-cloud-task-integration-tests/src/test/resources/org/springframework/cloud/task/listener/step-execution-sink-channel.properties @@ -0,0 +1,17 @@ +# +# Copyright 2016 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spring.cloud.stream.bindings.input.destination=step-execution-foobar diff --git a/spring-cloud-task-samples/batch-events/.mvn/wrapper/maven-wrapper.jar b/spring-cloud-task-samples/batch-events/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 00000000..5fd4d502 Binary files /dev/null and b/spring-cloud-task-samples/batch-events/.mvn/wrapper/maven-wrapper.jar differ diff --git a/spring-cloud-task-samples/batch-events/.mvn/wrapper/maven-wrapper.properties b/spring-cloud-task-samples/batch-events/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 00000000..eb919476 --- /dev/null +++ b/spring-cloud-task-samples/batch-events/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1 @@ +distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.3.3/apache-maven-3.3.3-bin.zip \ No newline at end of file diff --git a/spring-cloud-task-samples/batch-events/mvnw b/spring-cloud-task-samples/batch-events/mvnw new file mode 100755 index 00000000..a1ba1bf5 --- /dev/null +++ b/spring-cloud-task-samples/batch-events/mvnw @@ -0,0 +1,233 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven2 Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # + # Look for the Apple JDKs first to preserve the existing behaviour, and then look + # for the new JDKs provided by Oracle. + # + if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then + # + # Apple JDKs + # + export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then + # + # Apple JDKs + # + export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then + # + # Oracle JDKs + # + export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then + # + # Apple JDKs + # + export JAVA_HOME=`/usr/libexec/java_home` + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Migwn, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + # TODO classpath? +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` +fi + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + local basedir=$(pwd) + local wdir=$(pwd) + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + wdir=$(cd "$wdir/.."; pwd) + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)} +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} "$@" diff --git a/spring-cloud-task-samples/batch-events/mvnw.cmd b/spring-cloud-task-samples/batch-events/mvnw.cmd new file mode 100644 index 00000000..2b934e89 --- /dev/null +++ b/spring-cloud-task-samples/batch-events/mvnw.cmd @@ -0,0 +1,145 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven2 Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +set MAVEN_CMD_LINE_ARGS=%* + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" + +set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar"" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS% +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% \ No newline at end of file diff --git a/spring-cloud-task-samples/batch-events/pom.xml b/spring-cloud-task-samples/batch-events/pom.xml new file mode 100644 index 00000000..5b0eea71 --- /dev/null +++ b/spring-cloud-task-samples/batch-events/pom.xml @@ -0,0 +1,80 @@ + + + 4.0.0 + + io.spring.cloud + batch-events + 1.0.0.BUILD-SNAPSHOT + jar + + Batch Events Sample Application + Sample of sending batch events via Spring Cloud Streams + + + org.springframework.boot + spring-boot-starter-parent + 1.3.3.RELEASE + + + + + UTF-8 + 1.7 + + + + + org.springframework.cloud + spring-cloud-task-core + 1.0.0.BUILD-SNAPSHOT + + + org.springframework.cloud + spring-cloud-task-stream + 1.0.0.BUILD-SNAPSHOT + + + org.springframework.cloud + spring-cloud-stream-binder-test + 1.0.0.BUILD-SNAPSHOT + test + + + org.springframework.boot + spring-boot-starter-batch + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-stream-binder-redis + 1.0.0.BUILD-SNAPSHOT + + + org.springframework.cloud + spring-cloud-stream-test-support-internal + 1.0.0.BUILD-SNAPSHOT + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + diff --git a/spring-cloud-task-samples/batch-events/src/main/java/io/spring/cloud/BatchEventsApplication.java b/spring-cloud-task-samples/batch-events/src/main/java/io/spring/cloud/BatchEventsApplication.java new file mode 100644 index 00000000..cfee7c96 --- /dev/null +++ b/spring-cloud-task-samples/batch-events/src/main/java/io/spring/cloud/BatchEventsApplication.java @@ -0,0 +1,100 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.spring.cloud; + +import java.util.Arrays; +import java.util.List; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.support.ListItemReader; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.task.configuration.EnableTask; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@SpringBootApplication +@EnableTask +@EnableBatchProcessing +public class BatchEventsApplication { + + public static void main(String[] args) { + SpringApplication.run(BatchEventsApplication.class, args); + } + + @Configuration + public static class JobConfiguration { + + @Autowired + private JobBuilderFactory jobBuilderFactory; + + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Bean + public Step step1() { + return this.stepBuilderFactory.get("step1") + .tasklet(new Tasklet() { + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + System.out.println("Tasklet has run"); + return RepeatStatus.FINISHED; + } + }).build(); + } + + @Bean + public Step step2() { + return this.stepBuilderFactory.get("step2") + .chunk(3) + .reader(new ListItemReader<>(Arrays.asList("1", "2", "3", "4", "5", "6"))) + .processor(new ItemProcessor() { + @Override + public String process(String item) throws Exception { + return String.valueOf(Integer.parseInt(item) * -1); + } + }) + .writer(new ItemWriter() { + @Override + public void write(List items) throws Exception { + for (String item : items) { + System.out.println(">> " + item); + } + } + }).build(); + } + + @Bean + public Job job() { + return this.jobBuilderFactory.get("job") + .start(step1()) + .next(step2()) + .build(); + } + } +} diff --git a/spring-cloud-task-samples/batch-events/src/main/resources/application.properties b/spring-cloud-task-samples/batch-events/src/main/resources/application.properties new file mode 100644 index 00000000..adfd50cc --- /dev/null +++ b/spring-cloud-task-samples/batch-events/src/main/resources/application.properties @@ -0,0 +1,3 @@ +logging.level.org.springframework.cloud.task=DEBUG +logging.level.org.springframework.cloud.stream=DEBUG +spring.application.name=batchEvents diff --git a/spring-cloud-task-samples/batch-events/src/test/java/io/spring/cloud/BatchEventsApplicationTests.java b/spring-cloud-task-samples/batch-events/src/test/java/io/spring/cloud/BatchEventsApplicationTests.java new file mode 100644 index 00000000..df7c61c3 --- /dev/null +++ b/spring-cloud-task-samples/batch-events/src/test/java/io/spring/cloud/BatchEventsApplicationTests.java @@ -0,0 +1,71 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.spring.cloud; + +import java.util.concurrent.*; + +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.OutputCapture; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport; +import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent; +import org.springframework.context.annotation.PropertySource; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +public class BatchEventsApplicationTests { + + private static final String ITEM_INDICATOR = ">> -"; + + @ClassRule + public static RedisTestSupport redisTestSupport = new RedisTestSupport(); + + @Rule + public OutputCapture outputCapture = new OutputCapture(); + + // Count for two job execution events per task + static CountDownLatch jobExecutionLatch = new CountDownLatch(2); + + + @Test + public void testExecution() throws Exception { + SpringApplication.run(BatchEventsApplication.class); + Assert.assertTrue(jobExecutionLatch.await(1, TimeUnit.SECONDS)); + } + + @EnableBinding(Sink.class) + @PropertySource("classpath:io/spring/task/listener/job-listener-sink-channel.properties") + @EnableAutoConfiguration + public static class JobExecutionListenerBinding { + + @StreamListener(Sink.INPUT) + public void receive(JobExecutionEvent execution) { + Assert.assertEquals(String.format("Job name should be job"), "job", execution.getJobInstance().getJobName()); + jobExecutionLatch.countDown(); + } + } + +} diff --git a/spring-cloud-task-samples/batch-events/src/test/resources/io/spring/task/listener/job-listener-sink-channel.properties b/spring-cloud-task-samples/batch-events/src/test/resources/io/spring/task/listener/job-listener-sink-channel.properties new file mode 100644 index 00000000..0e98a419 --- /dev/null +++ b/spring-cloud-task-samples/batch-events/src/test/resources/io/spring/task/listener/job-listener-sink-channel.properties @@ -0,0 +1,17 @@ +# +# Copyright 2016 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spring.cloud.stream.bindings.input.destination=job-execution-events diff --git a/spring-cloud-task-samples/batch-job/pom.xml b/spring-cloud-task-samples/batch-job/pom.xml index c639a5eb..7849d3bc 100644 --- a/spring-cloud-task-samples/batch-job/pom.xml +++ b/spring-cloud-task-samples/batch-job/pom.xml @@ -9,7 +9,7 @@ 1.0.0.BUILD-SNAPSHOT Spring Cloud Task Batch Example - batch-job + Batch Job Sample Application org.springframework.boot diff --git a/spring-cloud-task-samples/batch-job/src/test/java/io/spring/BatchJobApplicationTests.java b/spring-cloud-task-samples/batch-job/src/test/java/io/spring/BatchJobApplicationTests.java index b69bddeb..57698079 100644 --- a/spring-cloud-task-samples/batch-job/src/test/java/io/spring/BatchJobApplicationTests.java +++ b/spring-cloud-task-samples/batch-job/src/test/java/io/spring/BatchJobApplicationTests.java @@ -39,7 +39,7 @@ public class BatchJobApplicationTests { public OutputCapture outputCapture = new OutputCapture(); @Test - public void testTimeStampApp() throws Exception { + public void testBatchJobApp() throws Exception { final String JOB_RUN_MESSAGE = " was run"; final String CREATE_TASK_MESSAGE = "Creating: TaskExecution{executionId="; final String UPDATE_TASK_MESSAGE = "Updating: TaskExecution with executionId="; diff --git a/spring-cloud-task-samples/pom.xml b/spring-cloud-task-samples/pom.xml index c8f8e503..3daeb721 100644 --- a/spring-cloud-task-samples/pom.xml +++ b/spring-cloud-task-samples/pom.xml @@ -28,6 +28,7 @@ taskprocessor partitioned-batch-job task-events + batch-events diff --git a/spring-cloud-task-stream/pom.xml b/spring-cloud-task-stream/pom.xml index cdf27dd3..f5be54bc 100644 --- a/spring-cloud-task-stream/pom.xml +++ b/spring-cloud-task-stream/pom.xml @@ -15,6 +15,10 @@ + + org.springframework.batch + spring-batch-core + org.springframework.cloud spring-cloud-stream @@ -42,9 +46,25 @@ org.springframework.cloud - spring-cloud-stream-test-support + spring-cloud-stream-test-support-internal 1.0.0.BUILD-SNAPSHOT test + + org.springframework.cloud + spring-cloud-stream-binder-redis + 1.0.0.BUILD-SNAPSHOT + test + + + org.springframework.boot + spring-boot-starter-redis + test + + + org.springframework.cloud + spring-cloud-task-batch + test + diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/BatchEventAutoConfiguration.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/BatchEventAutoConfiguration.java new file mode 100644 index 00000000..099b24b2 --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/BatchEventAutoConfiguration.java @@ -0,0 +1,147 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.task.batch.listener; + +import org.springframework.batch.core.ChunkListener; +import org.springframework.batch.core.ItemProcessListener; +import org.springframework.batch.core.ItemReadListener; +import org.springframework.batch.core.ItemWriteListener; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.batch.core.SkipListener; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.task.batch.listener.support.TaskBatchEventListenerBeanPostProcessor; +import org.springframework.cloud.task.listener.TaskLifecycleListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; +import org.springframework.integration.gateway.GatewayProxyFactoryBean; +import org.springframework.messaging.MessageChannel; + +/** + * Configures the listeners and channels that are required to emit job messages. + * @author Michael Minella + * @author Glenn Renfro + */ +@Configuration +@ConditionalOnBean(value = { Job.class, TaskLifecycleListener.class }) +@ConditionalOnProperty(prefix = "spring.cloud.task.batch.events", name = "enabled", havingValue = "true", matchIfMissing = true) +public class BatchEventAutoConfiguration { + + public final static String JOB_EXECUTION_EVENTS_LISTENER = "jobExecutionEventsListener"; + public final static String CHUNK_EVENTS_LISTENER = "chunkEventsListener"; + public final static String STEP_EXECUTION_EVENTS_LISTENER = "stepExecutionEventsListener"; + public final static String ITEM_READ_EVENTS_LISTENER = "itemReadEventsListener"; + public final static String ITEM_WRITE_EVENTS_LISTENER = "itemWriteEventsListener"; + public final static String ITEM_PROCESS_EVENTS_LISTENER = "itemProcessEventsListener"; + public final static String SKIP_EVENTS_LISTENER = "skipEventsListener"; + + @Bean + @ConditionalOnMissingBean + public TaskBatchEventListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() { + return new TaskBatchEventListenerBeanPostProcessor(); + } + + @Configuration + @EnableBinding(BatchEventsChannels.class) + @ConditionalOnMissingBean(name = JOB_EXECUTION_EVENTS_LISTENER) + public static class JobExecutionListenerConfiguration { + + @Autowired + private BatchEventsChannels listenerChannels; + + @Bean + @Lazy + public JobExecutionListener jobExecutionEventsListener() { + return new EventEmittingJobExecutionListener(listenerChannels.jobExecutionEvents()); + } + + @Bean + public StepExecutionListener stepExecutionEventsListener() { + return new EventEmittingStepExecutionListener(listenerChannels.stepExecutionEvents()); + } + + @Bean + @Lazy + public GatewayProxyFactoryBean chunkEventsListener() { + GatewayProxyFactoryBean factoryBean = + new GatewayProxyFactoryBean(ChunkListener.class); + + factoryBean.setDefaultRequestChannel(listenerChannels.chunkEvents()); + + return factoryBean; + } + + @Bean + public ItemReadListener itemReadEventsListener() { + return new EventEmittingItemReadEventsListener(listenerChannels.itemReadEvents()); + } + + @Bean + public ItemWriteListener itemWriteEventsListener() { + return new EventEmittingItemWriteEventsListener(listenerChannels.itemWriteEvents()); + } + + @Bean + public ItemProcessListener itemProcessEventsListener() { + return new EventEmittingItemProcessListener(listenerChannels.itemProcessEvents()); + } + + @Bean + public SkipListener skipEventsListener() { + return new EventEmittingSkipListener(listenerChannels.skipEvents()); + } + } + + public interface BatchEventsChannels { + + String JOB_EXECUTION_EVENTS = "job-execution-events"; + String STEP_EXECUTION_EVENTS = "step-execution-events"; + String CHUNK_EXECUTION_EVENTS = "chunk-events"; + String ITEM_READ_EVENTS = "item-read-events"; + String ITEM_PROCESS_EVENTS = "item-process-events"; + String ITEM_WRITE_EVENTS = "item-write-events"; + String SKIP_EVENTS = "skip-events"; + + @Output(JOB_EXECUTION_EVENTS) + MessageChannel jobExecutionEvents(); + + @Output(STEP_EXECUTION_EVENTS) + MessageChannel stepExecutionEvents(); + + @Output(CHUNK_EXECUTION_EVENTS) + MessageChannel chunkEvents(); + + @Output(ITEM_READ_EVENTS) + MessageChannel itemReadEvents(); + + @Output(ITEM_WRITE_EVENTS) + MessageChannel itemWriteEvents(); + + @Output(ITEM_PROCESS_EVENTS) + MessageChannel itemProcessEvents(); + + @Output(SKIP_EVENTS) + MessageChannel skipEvents(); + + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemProcessListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemProcessListener.java new file mode 100644 index 00000000..414cb784 --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemProcessListener.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.task.batch.listener; + +import org.springframework.batch.core.ItemProcessListener; +import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.messaging.MessageChannel; +import org.springframework.util.Assert; + +/** + * Setups up the ItemProcessListener to emit events to the spring cloud stream output channel. + * + * @author Michael Minella + * @author Glenn Renfro + */ +public class EventEmittingItemProcessListener implements ItemProcessListener { + + private MessageChannel output; + private MessagePublisher messagePublisher; + + public EventEmittingItemProcessListener(MessageChannel output) { + Assert.notNull(output, "An output channel is required"); + this.output = output; + this.messagePublisher = new MessagePublisher(output); + } + + @Override + public void beforeProcess(Object item) { + } + + @Override + public void afterProcess(Object item, Object result) { + if (result == null) { + messagePublisher.publish("1 item was filtered"); + } + else if (item.equals(result)) { + messagePublisher.publish("item equaled result after processing"); + } + else { + messagePublisher.publish("item did not equal result after processing"); + } + } + + @Override + public void onProcessError(Object item, Exception e) { + messagePublisher.publishWithThrowableHeader("Exception while item was being processed", e.getMessage()); + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemReadEventsListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemReadEventsListener.java new file mode 100644 index 00000000..49865d3a --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemReadEventsListener.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ItemReadListener; +import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.messaging.MessageChannel; +import org.springframework.util.Assert; + +/** + * Setups up the ItemReadEventsListener to emit events to the spring cloud stream output channel. + * + * @author Glenn Renfro + */ +public class EventEmittingItemReadEventsListener implements ItemReadListener { + + private static final Logger logger = LoggerFactory.getLogger(EventEmittingItemReadEventsListener.class); + + private MessageChannel output; + private MessagePublisher messagePublisher; + + public EventEmittingItemReadEventsListener(MessageChannel output) { + Assert.notNull(output, "An output channel is required"); + this.output = output; + this.messagePublisher = new MessagePublisher(output); + } + + @Override + public void beforeRead() { + + } + + @Override + public void afterRead(Object item) { + + } + + @Override + public void onReadError(Exception ex) { + if (logger.isDebugEnabled()) { + logger.debug("Executing onReadError: " + ex.getMessage(), ex); + } + this.messagePublisher.publish(ex.getMessage()); + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemWriteEventsListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemWriteEventsListener.java new file mode 100644 index 00000000..61e21b7f --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemWriteEventsListener.java @@ -0,0 +1,67 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener; + +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ItemWriteListener; +import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.messaging.MessageChannel; +import org.springframework.util.Assert; + +/** + * Setups up the ItemWriteEventsListener to emit events to the spring cloud stream output channel. + * + * @author Glenn Renfro + */ +public class EventEmittingItemWriteEventsListener implements ItemWriteListener{ + + private static final Logger logger = LoggerFactory.getLogger(EventEmittingItemWriteEventsListener.class); + + private MessageChannel output; + private MessagePublisher messagePublisher; + + public EventEmittingItemWriteEventsListener(MessageChannel output) { + Assert.notNull(output, "An output channel is required"); + this.output = output; + this.messagePublisher = new MessagePublisher(output); + } + + @Override + public void beforeWrite(List items) { + messagePublisher.publish(items.size() + " items to be written."); + } + + @Override + public void afterWrite(List items) { + if (logger.isDebugEnabled()) { + logger.debug("Executing afterWrite: " + items); + } + this.messagePublisher.publish(items.size() + " items have been written."); + } + + @Override + public void onWriteError(Exception exception, List items) { + if (logger.isDebugEnabled()) { + logger.debug("Executing onWriteError: " + exception.getMessage(), exception); + } + String payload = "Exception while " + items.size() + " items are attempted to be written."; + this.messagePublisher.publishWithThrowableHeader(payload, exception.getMessage()); + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingJobExecutionListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingJobExecutionListener.java new file mode 100644 index 00000000..819a27a3 --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingJobExecutionListener.java @@ -0,0 +1,52 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.task.batch.listener; + +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent; +import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.messaging.MessageChannel; +import org.springframework.util.Assert; + +/** + * Setups up the StepExecutionListener to emit events to the spring cloud stream output channel. + * + * @author Michael Minella + * @author Glenn Renfro + */ +public class EventEmittingJobExecutionListener implements JobExecutionListener { + + private MessageChannel output; + + private MessagePublisher messagePublisher; + + public EventEmittingJobExecutionListener(MessageChannel output) { + Assert.notNull(output, "An output channel is required"); + this.output = output; + this.messagePublisher = new MessagePublisher<>(output); + } + + @Override + public void beforeJob(JobExecution jobExecution) { + this.messagePublisher.publish(new JobExecutionEvent(jobExecution)); + } + + @Override + public void afterJob(JobExecution jobExecution) { + this.messagePublisher.publish(new JobExecutionEvent(jobExecution)); + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingSkipListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingSkipListener.java new file mode 100644 index 00000000..4afc8976 --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingSkipListener.java @@ -0,0 +1,72 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.SkipListener; +import org.springframework.cloud.task.batch.listener.support.BatchJobHeaders; +import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; + +/** + * Setups up the SkipProcessListener to emit events to the spring cloud stream output channel. + * + * @author Glenn Renfro + */ +public class EventEmittingSkipListener implements SkipListener { + + private static final Logger logger = LoggerFactory.getLogger(EventEmittingSkipListener.class); + + private MessageChannel output; + + private MessagePublisher messagePublisher; + + public EventEmittingSkipListener(MessageChannel output) { + Assert.notNull(output, "An output channel is required"); + this.output = output; + this.messagePublisher = new MessagePublisher(output); + } + + @Override + public void onSkipInRead(Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("Executing onSkipInRead: " + t.getMessage(), t); + } + messagePublisher.publishWithThrowableHeader("Skipped when reading.", t.getMessage()); + + } + + @Override + public void onSkipInWrite(Object item, Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("Executing onSkipInWrite: " + t.getMessage(), t); + } + messagePublisher.publishWithThrowableHeader(item, t.getMessage()); + } + + @Override + public void onSkipInProcess(Object item, Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("Executing onSkipInProcess: " + t.getMessage(), t); + } + messagePublisher.publishWithThrowableHeader(item, t.getMessage()); + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingStepExecutionListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingStepExecutionListener.java new file mode 100644 index 00000000..d1af72e9 --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingStepExecutionListener.java @@ -0,0 +1,55 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.task.batch.listener; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent; +import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.messaging.MessageChannel; +import org.springframework.util.Assert; + +/** + * Setups up the StepExecutionListener to emit events to the spring cloud stream output channel. + * + * @author Michael Minella + * @author Glenn Renfro + */ +public class EventEmittingStepExecutionListener implements StepExecutionListener { + + private MessageChannel output; + + private MessagePublisher messagePublisher; + + public EventEmittingStepExecutionListener(MessageChannel output) { + Assert.notNull(output, "An output channel is required"); + this.output = output; + this.messagePublisher = new MessagePublisher<>(output); + } + + @Override + public void beforeStep(StepExecution stepExecution) { + this.messagePublisher.publish(new StepExecutionEvent(stepExecution)); + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + this.messagePublisher.publish(new StepExecutionEvent(stepExecution)); + + return stepExecution.getExitStatus(); + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/BatchJobHeaders.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/BatchJobHeaders.java new file mode 100644 index 00000000..bd3ce8f8 --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/BatchJobHeaders.java @@ -0,0 +1,34 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener.support; + +/** + * Headers definitions used by the batch job plugin. + * + * @author Gunnar Hillert + * @since 1.0 + */ +public final class BatchJobHeaders { + + public static final String BATCH_LISTENER_EVENT_TYPE = "batch_listener_event_type"; + + public static final String BATCH_EXCEPTION = "batch_exception"; + + private BatchJobHeaders() { + } + +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/ExitStatus.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/ExitStatus.java new file mode 100644 index 00000000..54bf0a85 --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/ExitStatus.java @@ -0,0 +1,54 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener.support; + +/** + * ExitStatus DTO created so that {@link org.springframework.batch.core.ExitStatus} can be serialized into Json without + * having to add mixins to an ObjectMapper + * @author Glenn Renfro + */ +public class ExitStatus { + + private String exitCode; + + private String exitDescription; + + public ExitStatus(){ + + } + + public ExitStatus(String exitCode, String exitDescription) { + this.exitCode = exitCode; + this.exitDescription = exitDescription; + } + + public String getExitCode() { + return exitCode; + } + + public void setExitCode(String exitCode) { + this.exitCode = exitCode; + } + + public String getExitDescription() { + return exitDescription; + } + + public void setExitDescription(String exitDescription) { + this.exitDescription = exitDescription; + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobExecutionEvent.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobExecutionEvent.java new file mode 100644 index 00000000..0665cafd --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobExecutionEvent.java @@ -0,0 +1,336 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener.support; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import org.springframework.batch.core.*; +import org.springframework.batch.item.ExecutionContext; + +/** + * This is a JobEvent DTO created so that a {@link org.springframework.batch.core.JobExecution} can be serialized into + * Json without having to add mixins to an ObjectMapper. + * @author Glenn Renfro + */ +public class JobExecutionEvent extends Entity { + + private JobParametersEvent jobParameters; + + private JobInstanceEvent jobInstance; + + private volatile Collection stepExecutions = new CopyOnWriteArraySet(); + + private volatile BatchStatus status = BatchStatus.STARTING; + + private volatile Date startTime = null; + + private volatile Date createTime = new Date(System.currentTimeMillis()); + + private volatile Date endTime = null; + + private volatile Date lastUpdated = null; + + private volatile ExitStatus exitStatus = new ExitStatus("UNKNOWN", null); + + private volatile ExecutionContext executionContext = new ExecutionContext(); + + private transient volatile List failureExceptions = new CopyOnWriteArrayList(); + + private String jobConfigurationName; + + public JobExecutionEvent() { + super(); + } + + /** + * Constructor for the StepExecution to initialize the DTO. + * + * @param original the StepExecution to build this DTO around. + */ + public JobExecutionEvent(JobExecution original) { + this.jobParameters = new JobParametersEvent(original.getJobParameters().getParameters()); + this.jobInstance = new JobInstanceEvent(original.getJobInstance().getId(), original.getJobInstance().getJobName()); + for(StepExecution stepExecution : original.getStepExecutions()){ + stepExecutions.add(new StepExecutionEvent(stepExecution)); + } + this.status = original.getStatus(); + this.startTime = original.getStartTime(); + this.createTime = original.getCreateTime(); + this.endTime = original.getEndTime(); + this.lastUpdated = original.getLastUpdated(); + this.exitStatus = new ExitStatus(original.getExitStatus().getExitCode(), + original.getExitStatus().getExitDescription()); + this.executionContext = original.getExecutionContext(); + this.failureExceptions = original.getFailureExceptions(); + this.jobConfigurationName = original.getJobConfigurationName(); + this.setId(original.getId()); + this.setVersion(original.getVersion()); + } + + public JobParametersEvent getJobParameters() { + return this.jobParameters; + } + + public Date getEndTime() { + return endTime; + } + + public void setJobInstance(JobInstanceEvent jobInstance) { + this.jobInstance = jobInstance; + } + + public void setEndTime(Date endTime) { + this.endTime = endTime; + } + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public BatchStatus getStatus() { + return status; + } + + /** + * Set the value of the status field. + * + * @param status the status to set + */ + public void setStatus(BatchStatus status) { + this.status = status; + } + + /** + * Upgrade the status field if the provided value is greater than the + * existing one. Clients using this method to set the status can be sure + * that they don't overwrite a failed status with an successful one. + * + * @param status the new status value + */ + public void upgradeStatus(BatchStatus status) { + this.status = this.status.upgradeTo(status); + } + + /** + * Convenience getter for for the id of the enclosing job. Useful for DAO + * implementations. + * + * @return the id of the enclosing job + */ + public Long getJobId() { + if (jobInstance != null) { + return jobInstance.getId(); + } + return null; + } + + /** + * @param exitStatus + */ + public void setExitStatus(ExitStatus exitStatus) { + this.exitStatus = exitStatus; + } + + /** + * @return the exitCode + */ + public ExitStatus getExitStatus() { + return exitStatus; + } + + /** + * @return the Job that is executing. + */ + public JobInstanceEvent getJobInstance() { + return jobInstance; + } + + /** + * Accessor for the step executions. + * + * @return the step executions that were registered + */ + public Collection getStepExecutions() { + return Collections.unmodifiableList(new ArrayList(stepExecutions)); + } + + /** + * Test if this {@link JobExecution} indicates that it is running. It should + * be noted that this does not necessarily mean that it has been persisted + * as such yet. + * @return true if the end time is null + */ + public boolean isRunning() { + return endTime == null; + } + + /** + * Test if this {@link JobExecution} indicates that it has been signalled to + * stop. + * @return true if the status is {@link BatchStatus#STOPPING} + */ + public boolean isStopping() { + return status == BatchStatus.STOPPING; + } + + /** + * Signal the {@link JobExecution} to stop. Iterates through the associated + * {@link StepExecution}s, calling {@link StepExecution#setTerminateOnly()}. + * + */ + public void stop() { + for (StepExecutionEvent stepExecution : stepExecutions) { + stepExecution.setTerminateOnly(); + } + status = BatchStatus.STOPPING; + } + + /** + * Sets the {@link ExecutionContext} for this execution + * + * @param executionContext the context + */ + public void setExecutionContext(ExecutionContext executionContext) { + this.executionContext = executionContext; + } + + /** + * Returns the {@link ExecutionContext} for this execution. The content is + * expected to be persisted after each step completion (successful or not). + * + * @return the context + */ + public ExecutionContext getExecutionContext() { + return executionContext; + } + + /** + * @return the time when this execution was created. + */ + public Date getCreateTime() { + return createTime; + } + + /** + * @param createTime creation time of this execution. + */ + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public String getJobConfigurationName() { + return this.jobConfigurationName; + } + + /** + * Package private method for re-constituting the step executions from + * existing instances. + * @param stepExecution + */ + void addStepExecution(StepExecutionEvent stepExecution) { + stepExecutions.add(stepExecution); + } + + /** + * Get the date representing the last time this JobExecution was updated in + * the JobRepository. + * + * @return Date representing the last time this JobExecution was updated. + */ + public Date getLastUpdated() { + return lastUpdated; + } + + /** + * Set the last time this JobExecution was updated. + * + * @param lastUpdated + */ + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } + + public List getFailureExceptions() { + return failureExceptions; + } + + /** + * Add the provided throwable to the failure exception list. + * + * @param t + */ + public synchronized void addFailureException(Throwable t) { + this.failureExceptions.add(t); + } + + /** + * Return all failure causing exceptions for this JobExecution, including + * step executions. + * + * @return List<Throwable> containing all exceptions causing failure for + * this JobExecution. + */ + public synchronized List getAllFailureExceptions() { + + Set allExceptions = new HashSet(failureExceptions); + for (StepExecutionEvent stepExecution : stepExecutions) { + allExceptions.addAll(stepExecution.getFailureExceptions()); + } + + return new ArrayList(allExceptions); + } + + /** + * Deserialize and ensure transient fields are re-instantiated when read + * back + */ + private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + failureExceptions = new ArrayList(); + } + + /* + * (non-Javadoc) + * + * @see org.springframework.batch.core.domain.Entity#toString() + */ + @Override + public String toString() { + return super.toString() + + String.format(", startTime=%s, endTime=%s, lastUpdated=%s, status=%s, exitStatus=%s, job=[%s], jobParameters=[%s]", + startTime, endTime, lastUpdated, status, exitStatus, jobInstance, jobParameters); + } + + /** + * Add some step executions. For internal use only. + * @param stepExecutions step executions to add to the current list + */ + public void addStepExecutions(List stepExecutions) { + if (stepExecutions!=null) { + this.stepExecutions.removeAll(stepExecutions); + this.stepExecutions.addAll(stepExecutions); + } + } + + +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobInstanceEvent.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobInstanceEvent.java new file mode 100644 index 00000000..a53d1d1f --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobInstanceEvent.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener.support; + +import org.springframework.batch.core.Entity; +import org.springframework.util.Assert; + +/** + * This is a JobInstance DTO created so that a {@link org.springframework.batch.core.JobInstance} can be serialized into + * Json without having to add mixins to an ObjectMapper. + * + * @author Glenn Renfro + */ + +public class JobInstanceEvent extends Entity implements javax.batch.runtime.JobInstance { + + private String jobName; + + public JobInstanceEvent() { + + } + + public JobInstanceEvent(Long id, String jobName) { + super(id); + Assert.hasLength(jobName); + this.jobName = jobName; + } + + /** + * @return the job name. (Equivalent to getJob().getName()) + */ + @Override + public String getJobName() { + return jobName; + } + + @Override + public String toString() { + return super.toString() + ", Job=[" + jobName + "]"; + } + + @Override + public long getInstanceId() { + return super.getId(); + } + +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobParameterEvent.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobParameterEvent.java new file mode 100644 index 00000000..efc67c1b --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobParameterEvent.java @@ -0,0 +1,179 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener.support; + +import java.util.*; + + +/** + * This is a JobParameter DTO created so that a {@link org.springframework.batch.core.JobParameter} can be serialized + * into Json without having to add mixins to an ObjectMapper. + * @author Glenn Renfro + */ +public class JobParameterEvent { + private Object parameter; + + private JobParameterEvent.ParameterType parameterType; + + private boolean identifying; + + public JobParameterEvent() { + + } + + /** + * Construct a new JobParameter as a String. + */ + public JobParameterEvent(String parameter, boolean identifying) { + this.parameter = parameter; + parameterType = JobParameterEvent.ParameterType.STRING; + this.identifying = identifying; + } + + /** + * Construct a new JobParameter as a Long. + * + * @param parameter + */ + public JobParameterEvent(Long parameter, boolean identifying) { + this.parameter = parameter; + parameterType = JobParameterEvent.ParameterType.LONG; + this.identifying = identifying; + } + + /** + * Construct a new JobParameter as a Date. + * + * @param parameter + */ + public JobParameterEvent(Date parameter, boolean identifying) { + this.parameter = parameter; + parameterType = JobParameterEvent.ParameterType.DATE; + this.identifying = identifying; + } + + /** + * Construct a new JobParameter as a Double. + * + * @param parameter + */ + public JobParameterEvent(Double parameter, boolean identifying) { + this.parameter = parameter; + parameterType = JobParameterEvent.ParameterType.DOUBLE; + this.identifying = identifying; + } + + + /** + * Construct a new JobParameter as a String. + */ + public JobParameterEvent(String parameter) { + this.parameter = parameter; + parameterType = JobParameterEvent.ParameterType.STRING; + this.identifying = true; + } + + /** + * Construct a new JobParameter as a Long. + * + * @param parameter + */ + public JobParameterEvent(Long parameter) { + this.parameter = parameter; + parameterType = JobParameterEvent.ParameterType.LONG; + this.identifying = true; + } + + /** + * Construct a new JobParameter as a Date. + * + * @param parameter + */ + public JobParameterEvent(Date parameter) { + this.parameter = parameter; + parameterType = JobParameterEvent.ParameterType.DATE; + this.identifying = true; + } + + /** + * Construct a new JobParameter as a Double. + * + * @param parameter + */ + public JobParameterEvent(Double parameter) { + this.parameter = parameter; + parameterType = JobParameterEvent.ParameterType.DOUBLE; + this.identifying = true; + } + + public boolean isIdentifying() { + return identifying; + } + + /** + * @return the value contained within this JobParameter. + */ + public Object getValue() { + + if (parameter != null && parameter.getClass().isInstance(Date.class)) { + return new Date(((Date) parameter).getTime()); + } + else { + return parameter; + } + } + + /** + * @return a ParameterType representing the type of this parameter. + */ + public JobParameterEvent.ParameterType getType() { + return parameterType; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof JobParameterEvent == false) { + return false; + } + + if (this == obj) { + return true; + } + + JobParameterEvent rhs = (JobParameterEvent) obj; + return parameter==null ? rhs.parameter==null && parameterType==rhs.parameterType: parameter.equals(rhs.parameter); + } + + @Override + public String toString() { + return parameter == null ? null : (parameterType == JobParameterEvent.ParameterType.DATE ? "" + ((Date) parameter).getTime() + : parameter.toString()); + } + + @Override + public int hashCode() { + return 7 + 21 * (parameter == null ? parameterType.hashCode() : parameter.hashCode()); + } + + /** + * Enumeration representing the type of a JobParameter. + */ + public enum ParameterType { + + STRING, DATE, LONG, DOUBLE; + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobParametersEvent.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobParametersEvent.java new file mode 100644 index 00000000..623c083a --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/JobParametersEvent.java @@ -0,0 +1,227 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener.support; + +import java.util.*; + +import org.springframework.batch.core.JobParameter; + +/** + * This is a JobParametersEvent DTO created so that a {@link org.springframework.batch.core.JobParameters} can be + * serialized into Json without having to add mixins to an ObjectMapper. + * + * @author Glenn Renfro + */ +public class JobParametersEvent { + private final Map parameters; + + public JobParametersEvent() { + this.parameters = new LinkedHashMap(); + } + + public JobParametersEvent(Map jobParameters) { + this.parameters = new LinkedHashMap(); + for(Map.Entry entry: jobParameters.entrySet()){ + if(entry.getValue().getValue() instanceof String){ + parameters.put(entry.getKey(), new JobParameterEvent(((String) entry.getValue().getValue()), entry.getValue().isIdentifying())); + } + else if(entry.getValue().getValue() instanceof Long){ + parameters.put(entry.getKey(), new JobParameterEvent(((Long) entry.getValue().getValue()), entry.getValue().isIdentifying())); + } + else if(entry.getValue().getValue() instanceof Date){ + parameters.put(entry.getKey(), new JobParameterEvent(((Date) entry.getValue().getValue()), entry.getValue().isIdentifying())); + } + else if(entry.getValue().getValue() instanceof Double){ + parameters.put(entry.getKey(), new JobParameterEvent(((Double) entry.getValue().getValue()), entry.getValue().isIdentifying())); + } + } + } + + + /** + * Typesafe Getter for the Long represented by the provided key. + * + * @param key The key to get a value for + * @return The Long value + */ + public Long getLong(String key){ + if (!parameters.containsKey(key)) { + return 0L; + } + Object value = parameters.get(key).getValue(); + return value==null ? 0L : ((Long)value).longValue(); + } + + /** + * Typesafe Getter for the Long represented by the provided key. If the + * key does not exist, the default value will be returned. + * + * @param key to return the value for + * @param defaultValue to return if the value doesn't exist + * @return the parameter represented by the provided key, defaultValue + * otherwise. + */ + public Long getLong(String key, long defaultValue){ + if(parameters.containsKey(key)){ + return getLong(key); + } + else{ + return defaultValue; + } + } + + /** + * Typesafe Getter for the String represented by the provided key. + * + * @param key The key to get a value for + * @return The String value + */ + public String getString(String key){ + JobParameterEvent value = parameters.get(key); + return value==null ? null : value.toString(); + } + + /** + * Typesafe Getter for the String represented by the provided key. If the + * key does not exist, the default value will be returned. + * + * @param key to return the value for + * @param defaultValue to return if the value doesn't exist + * @return the parameter represented by the provided key, defaultValue + * otherwise. + */ + public String getString(String key, String defaultValue){ + if(parameters.containsKey(key)){ + return getString(key); + } + else{ + return defaultValue; + } + } + + /** + * Typesafe Getter for the Long represented by the provided key. + * + * @param key The key to get a value for + * @return The Double value + */ + public Double getDouble(String key){ + if (!parameters.containsKey(key)) { + return 0.0; + } + Double value = (Double)parameters.get(key).getValue(); + return value==null ? 0.0 : value.doubleValue(); + } + + /** + * Typesafe Getter for the Double represented by the provided key. If the + * key does not exist, the default value will be returned. + * + * @param key to return the value for + * @param defaultValue to return if the value doesn't exist + * @return the parameter represented by the provided key, defaultValue + * otherwise. + */ + public Double getDouble(String key, double defaultValue){ + if(parameters.containsKey(key)){ + return getDouble(key); + } + else{ + return defaultValue; + } + } + + /** + * Typesafe Getter for the Date represented by the provided key. + * + * @param key The key to get a value for + * @return The java.util.Date value + */ + public Date getDate(String key){ + return this.getDate(key,null); + } + + /** + * Typesafe Getter for the Date represented by the provided key. If the + * key does not exist, the default value will be returned. + * + * @param key to return the value for + * @param defaultValue to return if the value doesn't exist + * @return the parameter represented by the provided key, defaultValue + * otherwise. + */ + public Date getDate(String key, Date defaultValue){ + if(parameters.containsKey(key)){ + return (Date)parameters.get(key).getValue(); + } + else{ + return defaultValue; + } + } + + /** + * Get a map of all parameters, including string, long, and date. + * + * @return an unmodifiable map containing all parameters. + */ + public Map getParameters(){ + return new LinkedHashMap(parameters); + } + + /** + * @return true if the parameters is empty, false otherwise. + */ + public boolean isEmpty(){ + return parameters.isEmpty(); + } + + @Override + public boolean equals(Object obj) { + if(obj instanceof JobParametersEvent == false){ + return false; + } + + if(obj == this){ + return true; + } + + JobParametersEvent rhs = (JobParametersEvent)obj; + return this.parameters.equals(rhs.parameters); + } + + @Override + public int hashCode() { + return 17 + 23 * parameters.hashCode(); + } + + @Override + public String toString() { + return parameters.toString(); + } + + public Properties toProperties() { + Properties props = new Properties(); + + for (Map.Entry param : parameters.entrySet()) { + if(param.getValue() != null) { + props.put(param.getKey(), param.getValue().toString()); + } + } + + return props; + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/MessagePublisher.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/MessagePublisher.java new file mode 100644 index 00000000..f7adfd7b --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/MessagePublisher.java @@ -0,0 +1,55 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener.support; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; + +/** + * Utility class that sends batch job listener payloads to the notification channel. + * @author Glenn Renfro + */ +public class MessagePublisher

{ + private final MessageChannel listenerEventsChannel; + + public MessagePublisher(MessageChannel listenerEventsChannel) { + Assert.notNull(listenerEventsChannel, "listenerEventsChannel must not be null"); + this.listenerEventsChannel = listenerEventsChannel; + } + + public final void publish(P payload) { + if (payload instanceof Message) { + this.publishMessage((Message) payload); + } + else { + Message

message = MessageBuilder.withPayload(payload).build(); + this.listenerEventsChannel.send(message); + } + } + + private final void publishMessage(Message message) { + this.listenerEventsChannel.send(message); + } + + public void publishWithThrowableHeader(P payload, String header) { + Message

message = MessageBuilder.withPayload(payload).setHeader(BatchJobHeaders.BATCH_EXCEPTION, + header).build(); + publishMessage(message); + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/StepExecutionEvent.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/StepExecutionEvent.java new file mode 100644 index 00000000..09594684 --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/StepExecutionEvent.java @@ -0,0 +1,476 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener.support; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.Entity; + +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.util.Assert; + +/** + * This is a StepExecution DTO created so that a {@link org.springframework.batch.core.StepExecution} can be serialized + * into Json without having to add mixins to an ObjectMapper. + * @author Glenn Renfro + */ +public class StepExecutionEvent extends Entity { + + private volatile long jobExecutionId; + + private volatile String stepName; + + private volatile BatchStatus status = BatchStatus.STARTING; + + private volatile int readCount = 0; + + private volatile int writeCount = 0; + + private volatile int commitCount = 0; + + private volatile int rollbackCount = 0; + + private volatile int readSkipCount = 0; + + private volatile int processSkipCount = 0; + + private volatile int writeSkipCount = 0; + + private volatile Date startTime = new Date(System.currentTimeMillis()); + + private volatile Date endTime = null; + + private volatile Date lastUpdated = null; + + private volatile ExecutionContext executionContext = new ExecutionContext(); + + private volatile ExitStatus exitStatus = new ExitStatus("EXECUTING", null); + + private volatile boolean terminateOnly; + + private volatile int filterCount; + + private transient volatile List failureExceptions = new CopyOnWriteArrayList(); + + + public StepExecutionEvent() { + super(); + } + + /** + * Constructor for the StepExecution to initialize the DTO. + * + * @param stepExecution the StepExecution to build this DTO around. + */ + public StepExecutionEvent(StepExecution stepExecution) { + super(); + Assert.notNull(stepExecution, "StepExecution must be provided to re-hydrate an existing StepExecutionEvent"); + Assert.notNull(stepExecution.getJobExecution(), "JobExecution must be provided to re-hydrate an existing StepExecutionEvent"); + setId(stepExecution.getId()); + this.jobExecutionId = stepExecution.getJobExecutionId(); + this.stepName = stepExecution.getStepName(); + + this.status = stepExecution.getStatus(); + this.exitStatus = new ExitStatus(stepExecution.getExitStatus().getExitCode(), + stepExecution.getExitStatus().getExitDescription()); + this.executionContext = stepExecution.getExecutionContext(); + for (Throwable throwable : stepExecution.getFailureExceptions()){ + this.failureExceptions.add(throwable); + } + this.terminateOnly = stepExecution.isTerminateOnly(); + + this.endTime = stepExecution.getEndTime(); + this.lastUpdated = stepExecution.getLastUpdated(); + this.startTime = stepExecution.getStartTime(); + + this.commitCount = stepExecution.getCommitCount(); + this.filterCount = stepExecution.getFilterCount(); + this.processSkipCount = stepExecution.getProcessSkipCount(); + this.readCount = stepExecution.getReadCount(); + this.readSkipCount = stepExecution.getReadSkipCount(); + this.rollbackCount = stepExecution.getRollbackCount(); + this.writeCount = stepExecution.getWriteCount(); + this.writeSkipCount = stepExecution.getWriteSkipCount(); + + } + + /** + * Returns the {@link ExecutionContext} for this execution + * + * @return the attributes + */ + public ExecutionContext getExecutionContext() { + return executionContext; + } + + /** + * Sets the {@link ExecutionContext} for this execution + * + * @param executionContext the attributes + */ + public void setExecutionContext(ExecutionContext executionContext) { + this.executionContext = executionContext; + } + + /** + * Returns the current number of commits for this execution + * + * @return the current number of commits + */ + public int getCommitCount() { + return commitCount; + } + + /** + * Sets the current number of commits for this execution + * + * @param commitCount the current number of commits + */ + public void setCommitCount(int commitCount) { + this.commitCount = commitCount; + } + + /** + * Returns the time that this execution ended + * + * @return the time that this execution ended + */ + public Date getEndTime() { + return endTime; + } + + /** + * Sets the time that this execution ended + * + * @param endTime the time that this execution ended + */ + public void setEndTime(Date endTime) { + this.endTime = endTime; + } + + /** + * Returns the current number of items read for this execution + * + * @return the current number of items read for this execution + */ + public int getReadCount() { + return readCount; + } + + /** + * Sets the current number of read items for this execution + * + * @param readCount the current number of read items for this execution + */ + public void setReadCount(int readCount) { + this.readCount = readCount; + } + + /** + * Returns the current number of items written for this execution + * + * @return the current number of items written for this execution + */ + public int getWriteCount() { + return writeCount; + } + + /** + * Sets the current number of written items for this execution + * + * @param writeCount the current number of written items for this execution + */ + public void setWriteCount(int writeCount) { + this.writeCount = writeCount; + } + + /** + * Returns the current number of rollbacks for this execution + * + * @return the current number of rollbacks for this execution + */ + public int getRollbackCount() { + return rollbackCount; + } + + /** + * Returns the current number of items filtered out of this execution + * + * @return the current number of items filtered out of this execution + */ + public int getFilterCount() { + return filterCount; + } + + /** + * Public setter for the number of items filtered out of this execution. + * @param filterCount the number of items filtered out of this execution to + * set + */ + public void setFilterCount(int filterCount) { + this.filterCount = filterCount; + } + + /** + * Setter for number of rollbacks for this execution + */ + public void setRollbackCount(int rollbackCount) { + this.rollbackCount = rollbackCount; + } + + /** + * Gets the time this execution started + * + * @return the time this execution started + */ + public Date getStartTime() { + return startTime; + } + + /** + * Sets the time this execution started + * + * @param startTime the time this execution started + */ + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + /** + * Returns the current status of this step + * + * @return the current status of this step + */ + public BatchStatus getStatus() { + return status; + } + + /** + * Sets the current status of this step + * + * @param status the current status of this step + */ + public void setStatus(BatchStatus status) { + this.status = status; + } + + /** + * Upgrade the status field if the provided value is greater than the + * existing one. Clients using this method to set the status can be sure + * that they don't overwrite a failed status with an successful one. + * + * @param status the new status value + */ + public void upgradeStatus(BatchStatus status) { + this.status = this.status.upgradeTo(status); + } + + public void setStepName(String stepName) { + this.stepName = stepName; + } + /** + * @return the name of the step + */ + public String getStepName() { + return stepName; + } + + /** + * @param exitStatus + */ + public void setExitStatus(ExitStatus exitStatus) { + this.exitStatus = exitStatus; + } + + /** + * @return the exitCode + */ + public ExitStatus getExitStatus() { + return exitStatus; + } + + /** + * On unsuccessful execution after a chunk has rolled back. + */ + public synchronized void incrementRollbackCount() { + rollbackCount++; + } + + /** + * @return flag to indicate that an execution should halt + */ + public boolean isTerminateOnly() { + return this.terminateOnly; + } + + /** + * Set a flag that will signal to an execution environment that this + * execution (and its surrounding job) wishes to exit. + */ + public void setTerminateOnly() { + this.terminateOnly = true; + } + + /** + * @return the total number of items skipped. + */ + public int getSkipCount() { + return readSkipCount + processSkipCount + writeSkipCount; + } + + /** + * Increment the number of commits + */ + public void incrementCommitCount() { + commitCount++; + } + + /** + * @return the number of records skipped on read + */ + public int getReadSkipCount() { + return readSkipCount; + } + + /** + * @return the number of records skipped on write + */ + public int getWriteSkipCount() { + return writeSkipCount; + } + + /** + * Set the number of records skipped on read + * + * @param readSkipCount + */ + public void setReadSkipCount(int readSkipCount) { + this.readSkipCount = readSkipCount; + } + + /** + * Set the number of records skipped on write + * + * @param writeSkipCount + */ + public void setWriteSkipCount(int writeSkipCount) { + this.writeSkipCount = writeSkipCount; + } + + /** + * @return the number of records skipped during processing + */ + public int getProcessSkipCount() { + return processSkipCount; + } + + /** + * Set the number of records skipped during processing. + * + * @param processSkipCount + */ + public void setProcessSkipCount(int processSkipCount) { + this.processSkipCount = processSkipCount; + } + + /** + * @return the Date representing the last time this execution was persisted. + */ + public Date getLastUpdated() { + return lastUpdated; + } + + /** + * Set the time when the StepExecution was last updated before persisting + * + * @param lastUpdated + */ + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } + + public List getFailureExceptions() { + return failureExceptions; + } + + public void addFailureException(Throwable throwable) { + this.failureExceptions.add(throwable); + } + + public long getJobExecutionId() { + return jobExecutionId; + } + + /* + * (non-Javadoc) + * + * @see + * org.springframework.batch.container.common.domain.Entity#equals(java. + * lang.Object) + */ + @Override + public boolean equals(Object obj) { + + if ( !(obj instanceof StepExecution) || getId() == null) { + return super.equals(obj); + } + StepExecution other = (StepExecution) obj; + + return stepName.equals(other.getStepName()) && (jobExecutionId == other.getJobExecutionId()) + && getId().equals(other.getId()); + } + + /** + * Deserialize and ensure transient fields are re-instantiated when read + * back + */ + private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + failureExceptions = new ArrayList(); + } + + /* + * (non-Javadoc) + * + * @see org.springframework.batch.container.common.domain.Entity#hashCode() + */ + @Override + public int hashCode() { + Object jobExecutionId = getJobExecutionId(); + Long id = getId(); + return super.hashCode() + 31 * (stepName != null ? stepName.hashCode() : 0) + 91 + * (jobExecutionId != null ? jobExecutionId.hashCode() : 0) + 59 * (id != null ? id.hashCode() : 0); + } + + @Override + public String toString() { + return String.format(getSummary() + ", exitDescription=%s", exitStatus.getExitDescription()); + } + + public String getSummary() { + return super.toString() + + String.format( + ", name=%s, status=%s, exitStatus=%s, readCount=%d, filterCount=%d, writeCount=%d readSkipCount=%d, writeSkipCount=%d" + + ", processSkipCount=%d, commitCount=%d, rollbackCount=%d", stepName, status, + exitStatus.getExitCode(), readCount, filterCount, writeCount, readSkipCount, writeSkipCount, + processSkipCount, commitCount, rollbackCount); + } +} diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/TaskBatchEventListenerBeanPostProcessor.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/TaskBatchEventListenerBeanPostProcessor.java new file mode 100644 index 00000000..fd744b89 --- /dev/null +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/TaskBatchEventListenerBeanPostProcessor.java @@ -0,0 +1,114 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.task.batch.listener.support; + +import java.lang.reflect.*; + +import org.springframework.batch.core.ChunkListener; +import org.springframework.batch.core.ItemProcessListener; +import org.springframework.batch.core.ItemReadListener; +import org.springframework.batch.core.ItemWriteListener; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.batch.core.SkipListener; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.job.AbstractJob; +import org.springframework.batch.core.step.AbstractStep; +import org.springframework.batch.core.step.item.ChunkOrientedTasklet; +import org.springframework.batch.core.step.item.SimpleChunkProcessor; +import org.springframework.batch.core.step.item.SimpleChunkProvider; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration; +import org.springframework.context.ApplicationContext; +import org.springframework.util.ReflectionUtils; + +/** + * Attaches the listeners to the job and its steps. + * Based on the type of bean that is being processed will determine what listener is attached. + *

+ *

    + *
  • If the bean is of type AbstactJob then the JobExecutionListener is registered with this bean.
  • + *
  • If the bean is of type AbstactStep then the StepExecutionListener is registered with this bean.
  • + *
  • If the bean is of type TaskletStep then the ChunkEventListener is registered with this bean.
  • + *
  • If the tasklet for the TaskletStep is of type ChunkOrientedTasklet the following listeners will be registered.
  • + *
      + *
    • ItemReadListener with the ChunkProvider.
    • + *
    • ItemProcessListener with the ChunkProcessor.
    • + *
    • ItemWriteEventsListener with the ChunkProcessor.
    • + *
    • SkipEventsListener with the ChunkProcessor.
    • + *
    + *
+ *

+ * @author Michael Minella + * @author Glenn Renfro + */ +public class TaskBatchEventListenerBeanPostProcessor implements BeanPostProcessor { + + @Autowired + private ApplicationContext applicationContext; + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + + if (bean instanceof AbstractJob) { + JobExecutionListener jobExecutionEventsListener = + (JobExecutionListener) this.applicationContext.getBean( + BatchEventAutoConfiguration.JOB_EXECUTION_EVENTS_LISTENER); + + AbstractJob job = (AbstractJob) bean; + job.registerJobExecutionListener( + jobExecutionEventsListener); + } + + if (bean instanceof AbstractStep) { + StepExecutionListener stepExecutionListener = + (StepExecutionListener) this.applicationContext.getBean(BatchEventAutoConfiguration.STEP_EXECUTION_EVENTS_LISTENER); + + AbstractStep step = (AbstractStep) bean; + step.registerStepExecutionListener(stepExecutionListener); + + if (bean instanceof TaskletStep) { + TaskletStep taskletStep = (TaskletStep) bean; + taskletStep.registerChunkListener((ChunkListener) this.applicationContext.getBean(BatchEventAutoConfiguration.CHUNK_EVENTS_LISTENER)); + Tasklet tasklet = taskletStep.getTasklet(); + + if (tasklet instanceof ChunkOrientedTasklet) { + Field chunkProviderField = ReflectionUtils.findField(ChunkOrientedTasklet.class, "chunkProvider"); + ReflectionUtils.makeAccessible(chunkProviderField); + SimpleChunkProvider chunkProvider = (SimpleChunkProvider) ReflectionUtils.getField(chunkProviderField, tasklet); + Field chunkProcessorField = ReflectionUtils.findField(ChunkOrientedTasklet.class, "chunkProcessor"); + ReflectionUtils.makeAccessible(chunkProcessorField); + SimpleChunkProcessor chunkProcessor = (SimpleChunkProcessor) ReflectionUtils.getField(chunkProcessorField, tasklet); + chunkProvider.registerListener((ItemReadListener) this.applicationContext.getBean(BatchEventAutoConfiguration.ITEM_READ_EVENTS_LISTENER)); + chunkProvider.registerListener((SkipListener) this.applicationContext.getBean(BatchEventAutoConfiguration.SKIP_EVENTS_LISTENER)); + chunkProcessor.registerListener((ItemProcessListener) this.applicationContext.getBean(BatchEventAutoConfiguration.ITEM_PROCESS_EVENTS_LISTENER)); + chunkProcessor.registerListener((ItemWriteListener) this.applicationContext.getBean(BatchEventAutoConfiguration.ITEM_WRITE_EVENTS_LISTENER)); + chunkProcessor.registerListener((SkipListener) this.applicationContext.getBean(BatchEventAutoConfiguration.SKIP_EVENTS_LISTENER)); + } + } + } + + return bean; + } + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + return bean; + } +} diff --git a/spring-cloud-task-stream/src/main/resources/META-INF/spring.factories b/spring-cloud-task-stream/src/main/resources/META-INF/spring.factories index 58dee30c..fba87516 100644 --- a/spring-cloud-task-stream/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-task-stream/src/main/resources/META-INF/spring.factories @@ -1 +1,2 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.task.listener.TaskEventAutoConfiguration +org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration diff --git a/spring-cloud-task-stream/src/main/resources/org/springframework/cloud/task/application.properties b/spring-cloud-task-stream/src/main/resources/org/springframework/cloud/task/application.properties index 2add9998..88b9fb56 100644 --- a/spring-cloud-task-stream/src/main/resources/org/springframework/cloud/task/application.properties +++ b/spring-cloud-task-stream/src/main/resources/org/springframework/cloud/task/application.properties @@ -1 +1,9 @@ spring.cloud.stream.bindings.task-events.contentType=application/json +spring.cloud.stream.bindings.item-write-events.contentType=application/json +spring.cloud.stream.bindings.item-read-events.contentType=application/json +spring.cloud.stream.bindings.item-process-events.contentType=application/json +spring.cloud.stream.bindings.skip-events.contentType=application/json +spring.cloud.stream.bindings.step-execution-events.contentType=application/json +spring.cloud.stream.bindings.job-execution-events.contentType=application/json + + diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventJobExecutionTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventJobExecutionTests.java new file mode 100644 index 00000000..30ec0c61 --- /dev/null +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventJobExecutionTests.java @@ -0,0 +1,111 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener; + +import java.util.*; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; +import org.springframework.batch.core.JobParameter; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent; +import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * @author Glenn Renfro. + */ +public class EventJobExecutionTests { + + private static final String JOB_NAME = "FOODJOB"; + private static final Long JOB_INSTANCE_ID = 1l; + private static final Long JOB_EXECUTION_ID = 2l; + private static final String JOB_CONFIGURATION_NAME = "FOO_JOB_CONFIG"; + + private JobParameters jobParameters; + private JobInstance jobInstance; + + @Before + public void setup() { + jobInstance = new JobInstance(JOB_INSTANCE_ID, JOB_NAME); + jobParameters = new JobParameters(); + } + + @Test + public void testBasic() { + JobExecution jobExecution = new JobExecution(jobInstance, JOB_EXECUTION_ID, jobParameters, JOB_CONFIGURATION_NAME); + JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(jobExecution); + assertNotNull("jobInstance should not be null", jobExecutionEvent.getJobInstance()); + assertNotNull("jobParameters should not be null", jobExecutionEvent.getJobParameters()); + assertEquals("jobConfigurationName did not match expected", JOB_CONFIGURATION_NAME, + jobExecutionEvent.getJobConfigurationName()); + + assertEquals("jobParameters size did not match", 0, jobExecutionEvent.getJobParameters().getParameters().size()); + assertEquals("jobInstance name did not match", JOB_NAME, jobExecutionEvent.getJobInstance().getJobName()); + assertEquals("no step executions were expected", 0, jobExecutionEvent.getStepExecutions().size()); + assertEquals("exitStatus did not match expected", "UNKNOWN", jobExecutionEvent.getExitStatus().getExitCode()); + } + + @Test + public void testJobParameters() { + String[] JOB_PARAM_KEYS = { "A", "B", "C", "D" }; + Date testDate = new Date(); + JobParameter[] PARAMETERS = { new JobParameter("FOO", true), new JobParameter(1L, true), + new JobParameter(1D, true), new JobParameter(testDate, false) }; + + Map jobParamMap = new LinkedHashMap(); + for (int paramCount = 0; paramCount < JOB_PARAM_KEYS.length; paramCount++) { + jobParamMap.put(JOB_PARAM_KEYS[paramCount], PARAMETERS[paramCount]); + } + jobParameters = new JobParameters(jobParamMap); + JobExecution jobExecution = new JobExecution(jobInstance, JOB_EXECUTION_ID, jobParameters, JOB_CONFIGURATION_NAME); + JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(jobExecution); + + assertNotNull("Job Parameter A was expected", jobExecutionEvent.getJobParameters().getString("A")); + assertNotNull("Job Parameter B was expected", jobExecutionEvent.getJobParameters().getLong("B")); + assertNotNull("Job Parameter C was expected", jobExecutionEvent.getJobParameters().getDouble("C")); + assertNotNull("Job Parameter D was expected", jobExecutionEvent.getJobParameters().getDate("D")); + + assertEquals("Job Parameter A value was not correct", "FOO", jobExecutionEvent.getJobParameters().getString("A")); + assertEquals("Job Parameter B value was not correct", new Long(1), jobExecutionEvent.getJobParameters().getLong("B")); + assertEquals("Job Parameter C value was not correct", new Double(1), jobExecutionEvent.getJobParameters().getDouble("C")); + assertEquals("Job Parameter D value was not correct", testDate, jobExecutionEvent.getJobParameters().getDate("D")); + } + + @Test + public void testStepExecutions() { + JobExecution jobExecution = new JobExecution(jobInstance, JOB_EXECUTION_ID, jobParameters, JOB_CONFIGURATION_NAME); + List stepsExecutions = new ArrayList<>(); + stepsExecutions.add( new StepExecution("foo", jobExecution)); + stepsExecutions.add( new StepExecution("bar", jobExecution)); + stepsExecutions.add( new StepExecution("baz", jobExecution)); + jobExecution.addStepExecutions(stepsExecutions); + + JobExecutionEvent jobExecutionsEvent = new JobExecutionEvent(jobExecution); + assertEquals("stepExecutions count is incorrect", 3, jobExecutionsEvent.getStepExecutions().size()); + Iterator iter = jobExecutionsEvent.getStepExecutions().iterator(); + assertEquals("foo stepExecution is not present", "foo", iter.next().getStepName()); + assertEquals("bar stepExecution is not present", "bar", iter.next().getStepName()); + assertEquals("baz stepExecution is not present", "baz", iter.next().getStepName()); + + } +} diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventStepExecutionTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventStepExecutionTests.java new file mode 100644 index 00000000..0efb1d00 --- /dev/null +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventStepExecutionTests.java @@ -0,0 +1,63 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.batch.listener; + +import org.junit.Test; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent; + +import static org.junit.Assert.assertEquals; + +/** + * @author Glenn Renfro + */ +public class EventStepExecutionTests { + private static final String JOB_NAME = "FOO_JOB"; + private static final String STEP_NAME = "STEP_NAME"; + private static final Long JOB_INSTANCE_ID = 1l; + private static final Long JOB_EXECUTION_ID = 2l; + private static final String JOB_CONFIGURATION_NAME = "FOO_JOB_CONFIG"; + + @Test + public void testBasic(){ + JobInstance jobInstance = new JobInstance(JOB_INSTANCE_ID, JOB_NAME); + JobParameters jobParameters = new JobParameters(); + JobExecution jobExecution = new JobExecution(jobInstance, JOB_EXECUTION_ID, jobParameters, JOB_CONFIGURATION_NAME); + + StepExecution stepExecution = new StepExecution(STEP_NAME, jobExecution); + stepExecution.setCommitCount(1); + stepExecution.setReadCount(2); + stepExecution.setWriteCount(3); + stepExecution.setReadSkipCount(4); + stepExecution.setWriteSkipCount(5); + + StepExecutionEvent stepExecutionEvent = new StepExecutionEvent(stepExecution); + assertEquals("stepName result was not as expected", STEP_NAME, stepExecutionEvent.getStepName()); + assertEquals("startTime result was not as expected", stepExecution.getStartTime(), stepExecutionEvent.getStartTime()); + assertEquals("endTime result was not as expected", stepExecution.getEndTime(), stepExecutionEvent.getEndTime()); + assertEquals("lastUpdated result was not as expected", stepExecution.getLastUpdated(), stepExecutionEvent.getLastUpdated()); + assertEquals("commitCount result was not as expected", stepExecution.getCommitCount(), stepExecutionEvent.getCommitCount()); + assertEquals("readCount result was not as expected", stepExecution.getReadCount(), stepExecutionEvent.getReadCount()); + assertEquals("readSkipCount result was not as expected", stepExecution.getReadSkipCount(), stepExecutionEvent.getReadSkipCount()); + assertEquals("writeCount result was not as expected", stepExecution.getWriteCount(), stepExecutionEvent.getWriteCount()); + assertEquals("writeSkipCount result was not as expected", stepExecution.getWriteSkipCount(), stepExecutionEvent.getWriteSkipCount()); + assertEquals("skipCount result was not as expected", stepExecution.getSkipCount(), stepExecutionEvent.getSkipCount()); + } +} diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java index 7fcaec58..7704dcba 100644 --- a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java @@ -19,6 +19,7 @@ package org.springframework.cloud.task.launcher; import java.util.HashMap; import java.util.Map; +import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; @@ -27,6 +28,7 @@ import org.springframework.boot.test.SpringApplicationConfiguration; import org.springframework.cloud.deployer.spi.task.LaunchState; import org.springframework.cloud.stream.annotation.Bindings; import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport; import org.springframework.cloud.task.launcher.configuration.TaskConfiguration; import org.springframework.cloud.task.launcher.util.TaskLauncherSinkApplication; import org.springframework.context.ApplicationContext; @@ -39,6 +41,9 @@ import static org.junit.Assert.assertEquals; @SpringApplicationConfiguration(classes = {TaskLauncherSinkApplication.class, TaskConfiguration.class} ) public class TaskLauncherSinkTests { + @ClassRule + public static RedisTestSupport redisTestSupport = new RedisTestSupport(); + private final static String DEFAULT_STATUS = "test_status"; @Autowired diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java index 4d8ccd71..c7f348a1 100644 --- a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java @@ -15,11 +15,12 @@ */ package org.springframework.cloud.task.listener; +import org.junit.Rule; import org.junit.Test; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration; -import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration; +import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport; import org.springframework.cloud.task.configuration.EnableTask; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Configuration; @@ -28,19 +29,24 @@ import static org.junit.Assert.assertNotNull; /** * @author Michael Minella + * @author Ilayaperumal Gopinathan */ public class TaskEventTests { + @Rule + public RedisTestSupport redisTestSupport = new RedisTestSupport(); + + private static final String TASK_NAME = "taskEventTest"; + @Test public void testDefaultConfiguration() { ConfigurableApplicationContext applicationContext = - SpringApplication.run(new Object[] {TaskEventsConfiguration.class, + SpringApplication.run(new Object[]{ TaskEventsConfiguration.class, TaskEventAutoConfiguration.class, - PropertyPlaceholderAutoConfiguration.class, - TestSupportBinderAutoConfiguration.class}, - new String[] {"--spring.cloud.task.closecontext.enable=false", + PropertyPlaceholderAutoConfiguration.class }, + new String[]{ "--spring.cloud.task.closecontext.enable=false", "--spring.main.web-environment=false", - "--spring.cloud.stream.defaultBinder=test"}); + "--spring.cloud.stream.defaultBinder=test" }); assertNotNull(applicationContext.getBean("taskEventListener")); assertNotNull(applicationContext.getBean(TaskEventAutoConfiguration.TaskEventChannels.class)); @@ -50,4 +56,5 @@ public class TaskEventTests { @EnableTask public static class TaskEventsConfiguration { } + }