diff --git a/pom.xml b/pom.xml index 91d47814..50cdaf55 100755 --- a/pom.xml +++ b/pom.xml @@ -74,11 +74,6 @@ ${spring-cloud-deployer-local.version} test - - org.springframework.cloud - spring-cloud-stream-test-support-internal - ${spring-cloud-stream.version} - org.springframework.cloud spring-cloud-starter-stream-rabbit @@ -89,11 +84,6 @@ spring-cloud-stream ${spring-cloud-stream.version} - - org.springframework.cloud - spring-cloud-stream-test-support - ${spring-cloud-stream.version} - diff --git a/spring-cloud-task-integration-tests/pom.xml b/spring-cloud-task-integration-tests/pom.xml index d8d3f536..26f38fa5 100644 --- a/spring-cloud-task-integration-tests/pom.xml +++ b/spring-cloud-task-integration-tests/pom.xml @@ -41,11 +41,6 @@ spring-boot-starter-test test - - org.springframework.cloud - spring-cloud-stream-test-support-internal - test - org.springframework.cloud spring-cloud-starter-stream-rabbit @@ -113,6 +108,19 @@ ${test.ducttape.version} test + + org.springframework.cloud + spring-cloud-stream + ${spring-cloud-stream.version} + test-jar + test-binder + test + + + org.springframework.cloud + spring-cloud-stream + test + diff --git a/spring-cloud-task-integration-tests/src/test/java/configuration/JobConfiguration.java b/spring-cloud-task-integration-tests/src/test/java/configuration/JobConfiguration.java index 689ccd4f..12b0a8c0 100644 --- a/spring-cloud-task-integration-tests/src/test/java/configuration/JobConfiguration.java +++ b/spring-cloud-task-integration-tests/src/test/java/configuration/JobConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2021-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ListItemReader; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -40,6 +41,8 @@ import org.springframework.context.annotation.Configuration; */ @Configuration @EnableBatchProcessing +@ConditionalOnProperty(prefix = "spring.cloud.task.test", name = "enable-job-configuration", + havingValue = "true") public class JobConfiguration { private static final int DEFAULT_CHUNK_COUNT = 3; diff --git a/spring-cloud-task-integration-tests/src/test/java/configuration/JobSkipConfiguration.java b/spring-cloud-task-integration-tests/src/test/java/configuration/JobSkipConfiguration.java index 73ef88af..3bdb073e 100644 --- a/spring-cloud-task-integration-tests/src/test/java/configuration/JobSkipConfiguration.java +++ b/spring-cloud-task-integration-tests/src/test/java/configuration/JobSkipConfiguration.java @@ -27,6 +27,7 @@ import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -35,6 +36,7 @@ import org.springframework.context.annotation.Configuration; */ @Configuration @EnableBatchProcessing +@ConditionalOnProperty(prefix = "spring.cloud.task.test", name = "enable-fail-job-configuration") public class JobSkipConfiguration { @Autowired diff --git a/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java b/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java index b94b694a..d2d881e4 100644 --- a/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java +++ b/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,7 +36,7 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.deployer.spi.local.LocalDeployerProperties; import org.springframework.cloud.deployer.spi.local.LocalTaskLauncher; import org.springframework.cloud.deployer.spi.task.TaskLauncher; -import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.cloud.task.launcher.app.TaskLauncherSinkApplication; import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.cloud.task.repository.TaskExplorer; @@ -62,7 +62,9 @@ import static org.assertj.core.api.Assertions.assertThat; classes = { TaskLauncherSinkApplication.class, TaskLauncherSinkTests.TaskLauncherConfiguration.class }, properties = { - "maven.remote-repositories.repo1.url=https://repo.spring.io/libs-release" }) + "maven.remote-repositories.repo1.url=https://repo.spring.io/libs-release", + "spring.cloud.stream.function.bindings.taskLauncherSink-in-0=input", + "spring.cloud.stream.bindings.input.destination=taskLauncherSinkExchange" }) public class TaskLauncherSinkTests { private final static int WAIT_INTERVAL = 500; @@ -70,7 +72,7 @@ public class TaskLauncherSinkTests { private final static int MAX_WAIT_TIME = 120000; private final static String URL = "maven://org.springframework.cloud.task.app:" - + "timestamp-task:2.0.0.RELEASE"; + + "timestamp-task:2.1.1.RELEASE"; private final static String DATASOURCE_URL; @@ -96,7 +98,7 @@ public class TaskLauncherSinkTests { } @Autowired - private Sink sink; + private StreamBridge streamBridge; private DataSource dataSource; @@ -185,11 +187,10 @@ public class TaskLauncherSinkTests { } private void launchTask(String artifactURL) { - TaskLaunchRequest request = new TaskLaunchRequest(artifactURL, null, this.properties, null, null); GenericMessage message = new GenericMessage<>(request); - this.sink.input().send(message); + this.streamBridge.send("taskLauncherSinkExchange", message); } @Configuration diff --git a/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/BatchExecutionEventTests.java b/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/BatchExecutionEventTests.java index 582e24c7..af8e70b5 100644 --- a/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/BatchExecutionEventTests.java +++ b/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/BatchExecutionEventTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,30 +16,30 @@ package org.springframework.cloud.task.listener; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import configuration.JobConfiguration; import configuration.JobSkipConfiguration; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.RabbitMQContainer; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; + +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration; +import org.springframework.cloud.stream.binder.test.OutputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent; import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent; -import org.springframework.cloud.task.configuration.EnableTask; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.PropertySource; +import org.springframework.context.annotation.Import; +import org.springframework.messaging.Message; import static org.assertj.core.api.Assertions.assertThat; @@ -48,43 +48,16 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class BatchExecutionEventTests { - private static final String TASK_NAME = "jobEventTest"; - - static { - GenericContainer rabbitmq = new RabbitMQContainer("rabbitmq:3.8.9") - .withExposedPorts(5672); - rabbitmq.start(); - final Integer mappedPort = rabbitmq.getMappedPort(5672); - System.setProperty("spring.rabbitmq.test.port", mappedPort.toString()); - } - - // Count for two job execution events per job - static CountDownLatch jobExecutionLatch = new CountDownLatch(2); - - // Count for four step execution events per job - static CountDownLatch stepExecutionLatch = new CountDownLatch(4); - static int stepOneCount = 0; - static int stepTwoCount = 0; - - // Count for twelve item process events per job - static CountDownLatch itemProcessLatch = new CountDownLatch(6); - - // Count for eight chunk events per job - static CountDownLatch chunkEventsLatch = new CountDownLatch(8); - - // Count for zero read events per job - static CountDownLatch itemReadEventsLatch = new CountDownLatch(0); - - // Count for six write events per job - static CountDownLatch itemWriteEventsLatch = new CountDownLatch(2); - - // Count for 3 skip events per job - static CountDownLatch skipEventsLatch = new CountDownLatch(3); - static int readSkipCount = 0; - static int writeSkipCount = 0; + private static final String TASK_NAME = "taskEventTest"; + private final ObjectMapper objectMapper = new ObjectMapper(); private ConfigurableApplicationContext applicationContext; + @BeforeEach + public void setup() { + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + @AfterEach public void tearDown() { if (this.applicationContext != null && this.applicationContext.isActive()) { @@ -95,237 +68,185 @@ public class BatchExecutionEventTests { @Test public void testContext() { this.applicationContext = new SpringApplicationBuilder() - .sources(this.getConfigurations( - BatchExecutionEventTests.ListenerBinding.class, - JobConfiguration.class)) - .build().run(getCommandLineParams( - "--spring.cloud.stream.bindings.job-execution-events.destination=bazbar")); + .sources(TestChannelBinderConfiguration + .getCompleteConfiguration(BatchEventsApplication.class)).web(WebApplicationType.NONE) + .build().run(getCommandLineParams( + "--spring.cloud.stream.bindings.job-execution-events.destination=bazbar")); assertThat(this.applicationContext.getBean("jobExecutionEventsListener")) - .isNotNull(); + .isNotNull(); assertThat(this.applicationContext.getBean("stepExecutionEventsListener")) - .isNotNull(); + .isNotNull(); assertThat(this.applicationContext.getBean("chunkEventsListener")).isNotNull(); assertThat(this.applicationContext.getBean("itemReadEventsListener")).isNotNull(); assertThat(this.applicationContext.getBean("itemWriteEventsListener")) - .isNotNull(); + .isNotNull(); assertThat(this.applicationContext.getBean("itemProcessEventsListener")) - .isNotNull(); + .isNotNull(); assertThat(this.applicationContext.getBean("skipEventsListener")).isNotNull(); - assertThat(this.applicationContext - .getBean(BatchEventAutoConfiguration.BatchEventsChannels.class)) - .isNotNull(); - } @Test public void testJobEventListener() throws Exception { - testListener( - "--spring.cloud.stream.bindings.job-execution-events.destination=foobar", - jobExecutionLatch, BatchExecutionEventTests.ListenerBinding.class); - + List> result = testListener( + "--spring.cloud.task.batch.events.jobExecutionEventBindingName=foobar", "foobar", 1); + JobExecutionEvent jobExecutionEvent = this.objectMapper.readValue(result.get(0).getPayload(), + JobExecutionEvent.class); + Assertions.assertThat(jobExecutionEvent.getJobInstance().getJobName()) + .isEqualTo("job").as("Job name should be job"); } @Test public void testStepEventListener() throws Exception { - testListener( - "--spring.cloud.stream.bindings.step-execution-events.destination=step-execution-foobar", - stepExecutionLatch, BatchExecutionEventTests.StepListenerBinding.class); + final String bindingName = "step-execution-foobar"; + List> result = testListener( + "--spring.cloud.task.batch.events.stepExecutionEventBindingName=" + bindingName, + bindingName, 4); + int stepOneCount = 0; + int stepTwoCount = 0; + for (int i = 0; i < 4; i++) { + StepExecutionEvent stepExecutionEvent = this.objectMapper.readValue(result.get(i).getPayload(), + StepExecutionEvent.class); + if (stepExecutionEvent.getStepName().equals("step1")) { + stepOneCount++; + } + if (stepExecutionEvent.getStepName().equals("step2")) { + stepTwoCount++; + } + } + assertThat(stepOneCount).as("the number of step1 events did not match") - .isEqualTo(2); + .isEqualTo(2); assertThat(stepTwoCount).as("the number of step2 events did not match") - .isEqualTo(2); + .isEqualTo(2); } @Test - public void testItemProcessEventListener() throws Exception { - testListener( - "--spring.cloud.stream.bindings.item-process-events.destination=item-process-foobar", - itemProcessLatch, - BatchExecutionEventTests.ItemProcessListenerBinding.class); + public void testItemProcessEventListener() { + final String bindingName = "item-execution-foobar"; + + List> result = testListener( + "--spring.cloud.task.batch.events.itemProcessEventBindingName=" + bindingName, + bindingName, 1); + String value = new String(result.get(0).getPayload()); + assertThat(value).isEqualTo("\"item did not equal result after processing\""); + } @Test - public void testChunkListener() throws Exception { - testListener( - "--spring.cloud.stream.bindings.chunk-events.destination=chunk-events-foobar", - chunkEventsLatch, - BatchExecutionEventTests.ChunkEventsListenerBinding.class); + public void testChunkListener() { + final String bindingName = "chunk-events-foobar"; + + List> result = testListener( + "--spring.cloud.task.batch.events.chunkEventBindingName=" + bindingName, + bindingName, 2); + String value = new String(result.get(0).getPayload()); + assertThat(value).isEqualTo("\"Before Chunk Processing\""); + value = new String(result.get(1).getPayload()); + assertThat(value).isEqualTo("\"After Chunk Processing\""); } @Test - public void testItemReadListener() throws Exception { - testListener( - "--spring.cloud.stream.bindings.item-read-events.destination=item-read-events-foobar", - itemReadEventsLatch, - BatchExecutionEventTests.ItemReadEventsListenerBinding.class); - } + public void testWriteListener() { + final String bindingName = "item-write-events-foobar"; - @Test - public void testWriteListener() throws Exception { - testListener( - "--spring.cloud.stream.bindings.item-write-events.destination=item-write-events-foobar", - itemWriteEventsLatch, - BatchExecutionEventTests.ItemWriteEventsListenerBinding.class); - } - - @Test - public void testSkipEventListener() throws Exception { - testListenerSkip( - "--spring.cloud.stream.bindings.skip-events.destination=skip-event-foobar", - skipEventsLatch, - BatchExecutionEventTests.SkipEventsListenerBinding.class); - assertThat(readSkipCount).as("read skip count did not match expected result") - .isEqualTo(2); - assertThat(writeSkipCount).as("write skip count did not match expected result") - .isEqualTo(1); - } - - private Class[] getConfigurations(Class sinkClazz, Class jobConfigurationClazz) { - return new Class[] { PropertyPlaceholderAutoConfiguration.class, - jobConfigurationClazz, sinkClazz }; + List> result = testListener( + "--spring.cloud.task.batch.events.itemWriteEventBindingName=" + bindingName, + bindingName, 2); + String value = new String(result.get(0).getPayload()); + assertThat(value).isEqualTo("\"3 items to be written.\""); + value = new String(result.get(1).getPayload()); + assertThat(value).isEqualTo("\"3 items have been written.\""); } private String[] getCommandLineParams(String sinkChannelParam) { - return new String[] { "--spring.cloud.task.closecontext_enable=false", - "--spring.cloud.task.name=" + TASK_NAME, - "--spring.main.web-environment=false", - "--spring.cloud.stream.defaultBinder=rabbit", - "--spring.cloud.stream.bindings.task-events.destination=test", - "foo=" + UUID.randomUUID().toString(), sinkChannelParam }; + return getCommandLineParams(sinkChannelParam, true); } - private void testListener(String channelBinding, CountDownLatch latch, Class clazz) - throws Exception { + private String[] getCommandLineParams(String sinkChannelParam, boolean enableFailJobConfig) { + String jobConfig = enableFailJobConfig ? + "--spring.cloud.task.test.enable-job-configuration=true" : + "--spring.cloud.task.test.enable-fail-job-configuration=true"; + return new String[]{"--spring.cloud.task.closecontext_enable=false", + "--spring.cloud.task.name=" + TASK_NAME, + "--spring.main.web-environment=false", + "--spring.cloud.stream.defaultBinder=rabbit", + "--spring.cloud.stream.bindings.task-events.destination=test", + jobConfig, + "foo=" + UUID.randomUUID(), sinkChannelParam}; + } + + private List> testListener(String channelBinding, String bindingName, int numberToRead) { + return testListenerForApp(channelBinding, bindingName, numberToRead, BatchEventsApplication.class, true); + } + + private List> testListenerSkip(String channelBinding, String bindingName, int numberToRead) { + return testListenerForApp(channelBinding, bindingName, numberToRead, BatchSkipEventsApplication.class, false); + } + + private List> testListenerForApp(String channelBinding, + String bindingName, int numberToRead, Class clazz, boolean enableFailJobConfig) { + List> results = new ArrayList<>(); + this.applicationContext = new SpringApplicationBuilder() - .sources(this.getConfigurations(clazz, JobConfiguration.class)).build() - .run(getCommandLineParams(channelBinding)); + .sources(TestChannelBinderConfiguration + .getCompleteConfiguration(clazz)).web(WebApplicationType.NONE) + .build().run(getCommandLineParams(channelBinding, enableFailJobConfig)); - assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue(); - } + OutputDestination target = this.applicationContext.getBean(OutputDestination.class); - private void testListenerSkip(String channelBinding, CountDownLatch latch, - Class clazz) throws Exception { - this.applicationContext = new SpringApplicationBuilder() - .sources(this.getConfigurations(clazz, JobSkipConfiguration.class)) - .build().run(getCommandLineParams(channelBinding)); - - assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue(); - } - - @EnableBinding(Sink.class) - @PropertySource("classpath:/org/springframework/cloud/task/listener/job-execution-sink-channel.properties") - @EnableAutoConfiguration - @EnableTask - public static class ListenerBinding { - - @StreamListener(Sink.INPUT) - public void receive(JobExecutionEvent execution) { - Assertions.assertThat(execution.getJobInstance().getJobName()) - .isEqualTo("job").as("Job name should be job"); - jobExecutionLatch.countDown(); + for (int i = 0; i < numberToRead; i++) { + results.add(target.receive(10000, bindingName)); } - + return results; } - @EnableBinding(Sink.class) - @PropertySource("classpath:/org/springframework/cloud/task/listener/step-execution-sink-channel.properties") - @EnableAutoConfiguration - @EnableTask - public static class StepListenerBinding { - - @StreamListener(Sink.INPUT) - public void receive(StepExecutionEvent execution) { - if (execution.getStepName().equals("step1")) { - stepOneCount++; - } - if (execution.getStepName().equals("step2")) { - stepTwoCount++; - } - - stepExecutionLatch.countDown(); - } + @Test + public void testItemReadListener() { + final String bindingName = "item-read-events-foobar"; + List> result = testListenerSkip( + "--spring.cloud.task.batch.events.itemReadEventBindingName=" + bindingName, + bindingName, 1); + String exceptionMessage = new String(result.get(0).getPayload()); + assertThat(exceptionMessage).isEqualTo("\"Exception while item was being read\""); } - @EnableBinding(Sink.class) - @PropertySource("classpath:/org/springframework/cloud/task/listener/item-process-sink-channel.properties") - @EnableAutoConfiguration - @EnableTask - public static class ItemProcessListenerBinding { + @Test + public void testSkipEventListener() { + final String SKIPPING_READ_MESSAGE = "\"Skipped when reading.\""; - @StreamListener(Sink.INPUT) - public void receive(String object) { - itemProcessLatch.countDown(); - } - - } - - @EnableBinding(Sink.class) - @PropertySource("classpath:/org/springframework/cloud/task/listener/chunk-events-sink-channel.properties") - @EnableAutoConfiguration - @EnableTask - public static class ChunkEventsListenerBinding { - - @StreamListener(Sink.INPUT) - public void receive(String message) { - chunkEventsLatch.countDown(); - } - - } - - @EnableBinding(Sink.class) - @PropertySource("classpath:/org/springframework/cloud/task/listener/item-read-events-sink-channel.properties") - @EnableAutoConfiguration - @EnableTask - public static class ItemReadEventsListenerBinding { - - @StreamListener(Sink.INPUT) - public void receive(Object itemRead) { - itemReadEventsLatch.countDown(); - } - - } - - @EnableBinding(Sink.class) - @PropertySource("classpath:/org/springframework/cloud/task/listener/skip-events-sink-channel.properties") - @EnableAutoConfiguration - @EnableTask - public static class SkipEventsListenerBinding { - - private static final String SKIPPING_READ_MESSAGE = "Skipped when reading."; - - private static final String SKIPPING_WRITE_CONTENT = "-1"; - - @StreamListener(Sink.INPUT) - public void receive(String exceptionMessage) { - if (exceptionMessage.toString().equals(SKIPPING_READ_MESSAGE)) { + final String SKIPPING_WRITE_CONTENT = "\"-1\""; + final String bindingName = "skip-event-foobar"; + List> result = testListenerSkip( + "--spring.cloud.task.batch.events.skipEventBindingName=" + bindingName, + bindingName, 3); + int readSkipCount = 0; + int writeSkipCount = 0; + for (int i = 0; i < 3; i++) { + String exceptionMessage = new String(result.get(i).getPayload()); + if (exceptionMessage.equals(SKIPPING_READ_MESSAGE)) { readSkipCount++; } - if (exceptionMessage.toString().equals(SKIPPING_WRITE_CONTENT)) { + if (exceptionMessage.equals(SKIPPING_WRITE_CONTENT)) { writeSkipCount++; } - skipEventsLatch.countDown(); } + assertThat(readSkipCount).as("the number of read skip events did not match") + .isEqualTo(2); + assertThat(writeSkipCount).as("the number of write skip events did not match") + .isEqualTo(1); } - @EnableBinding(Sink.class) - @PropertySource("classpath:/org/springframework/cloud/task/listener/item-write-events-sink-channel.properties") - @EnableAutoConfiguration - @EnableTask - public static class ItemWriteEventsListenerBinding { - - @StreamListener(Sink.INPUT) - public void receive(String itemWrite) { - Assertions.assertThat(itemWrite).startsWith("3 items ") - .as("Message should start with '3 items'"); - Assertions.assertThat(itemWrite).endsWith("written.") - .as("Message should end with ' written.'"); - itemWriteEventsLatch.countDown(); - } - + @SpringBootApplication + @Import(JobConfiguration.class) + public static class BatchEventsApplication { } + @SpringBootApplication + @Import(JobSkipConfiguration.class) + public static class BatchSkipEventsApplication { + } } diff --git a/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java b/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java index a5466c73..5f8aa348 100644 --- a/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java +++ b/spring-cloud-task-integration-tests/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,78 +16,78 @@ package org.springframework.cloud.task.listener; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import com.fasterxml.jackson.databind.ObjectMapper; import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.RabbitMQContainer; -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.autoconfigure.AutoConfigurations; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.runner.ApplicationContextRunner; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.binder.rabbit.config.RabbitConfiguration; -import org.springframework.cloud.stream.config.BindingServiceConfiguration; -import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.test.OutputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; import org.springframework.cloud.task.configuration.EnableTask; -import org.springframework.cloud.task.configuration.SimpleTaskAutoConfiguration; import org.springframework.cloud.task.repository.TaskExecution; -import org.springframework.context.annotation.Bean; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.PropertySource; -import org.springframework.test.context.junit.jupiter.SpringExtension; - -import static org.assertj.core.api.Assertions.assertThat; +import org.springframework.messaging.Message; /** * @author Michael Minella * @author Ilayaperumal Gopinathan */ -@ExtendWith(SpringExtension.class) -@SpringBootTest(classes = { TaskEventTests.ListenerBinding.class }) public class TaskEventTests { private static final String TASK_NAME = "taskEventTest"; - static { - GenericContainer rabbitmq = new RabbitMQContainer("rabbitmq:3.8.9") - .withExposedPorts(5672); - rabbitmq.start(); - final Integer mappedPort = rabbitmq.getMappedPort(5672); - System.setProperty("spring.rabbitmq.test.port", mappedPort.toString()); - } + private final ObjectMapper objectMapper = new ObjectMapper(); - // Count for two task execution events per task - static CountDownLatch latch = new CountDownLatch(2); + private ConfigurableApplicationContext applicationContext; + + @AfterEach + public void tearDown() { + if (this.applicationContext != null && this.applicationContext.isActive()) { + this.applicationContext.close(); + } + } @Test public void testTaskEventListener() throws Exception { - ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(TaskEventAutoConfiguration.class, - PropertyPlaceholderAutoConfiguration.class, - RabbitConfiguration.class, - SimpleTaskAutoConfiguration.class, - BindingServiceConfiguration.class)) - .withUserConfiguration(TaskEventsConfiguration.class) - .withPropertyValues("--spring.cloud.task.closecontext_enabled=false", - "--spring.cloud.task.name=" + TASK_NAME, - "--spring.main.web-environment=false", - "--spring.cloud.stream.defaultBinder=rabbit", - "--spring.cloud.stream.bindings.task-events.destination=test"); - applicationContextRunner.run((context) -> { - assertThat(context.getBean("taskEventListener")).isNotNull(); - assertThat( - context.getBean(TaskEventAutoConfiguration.TaskEventChannels.class)) - .isNotNull(); - }); - assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue(); + List> result = testListener( + "--spring.cloud.task.batch.events.itemWriteEventBindingName=task-events", + "task-events", 2); + TaskExecution taskExecution = this.objectMapper.readValue(result.get(0).getPayload(), + TaskExecution.class); + Assertions.assertThat(taskExecution.getTaskName()).isEqualTo(TASK_NAME) + .as(String.format("Task name should be '%s'", TASK_NAME)); + taskExecution = this.objectMapper.readValue(result.get(1).getPayload(), + TaskExecution.class); + Assertions.assertThat(taskExecution.getTaskName()).isEqualTo(TASK_NAME) + .as(String.format("Task name should be '%s'", TASK_NAME)); + } + + private List> testListener(String channelBinding, String bindingName, int numberToRead) { + List> results = new ArrayList<>(); + this.applicationContext = new SpringApplicationBuilder() + .sources(TestChannelBinderConfiguration + .getCompleteConfiguration(BatchExecutionEventTests.BatchEventsApplication.class)).web(WebApplicationType.NONE).build() + .run(getCommandLineParams(channelBinding)); + OutputDestination target = this.applicationContext.getBean(OutputDestination.class); + for (int i = 0; i < numberToRead; i++) { + results.add(target.receive(10000, bindingName)); + } + return results; + } + + private String[] getCommandLineParams(String sinkChannelParam) { + return new String[]{"--spring.cloud.task.closecontext_enable=false", + "--spring.cloud.task.name=" + TASK_NAME, + "--spring.main.web-environment=false", + "--spring.cloud.stream.defaultBinder=rabbit", + "foo=" + UUID.randomUUID(), sinkChannelParam}; } @EnableTask @@ -95,29 +95,4 @@ public class TaskEventTests { public static class TaskEventsConfiguration { } - - @EnableBinding(Sink.class) - @PropertySource("classpath:/org/springframework/cloud/task/listener/sink-channel.properties") - @EnableAutoConfiguration - public static class ListenerBinding { - - @StreamListener(Sink.INPUT) - public void receive(TaskExecution execution) { - Assertions.assertThat(execution.getTaskName()).isEqualTo(TASK_NAME) - .as(String.format("Task name should be '%s'", TASK_NAME)); - latch.countDown(); - } - - @Bean - public CommandLineRunner commandLineRunner() { - return new CommandLineRunner() { - @Override - public void run(String... args) throws Exception { - System.out.println("Hello World"); - } - }; - } - - } - } diff --git a/spring-cloud-task-samples/batch-events/pom.xml b/spring-cloud-task-samples/batch-events/pom.xml index f7b25066..520b35a6 100644 --- a/spring-cloud-task-samples/batch-events/pom.xml +++ b/spring-cloud-task-samples/batch-events/pom.xml @@ -65,12 +65,6 @@ spring-cloud-stream-binder-rabbit-test-support test - - org.springframework.cloud - spring-cloud-stream-test-support-internal - ${spring.cloud.stream} - test - org.springframework.cloud spring-cloud-starter-task @@ -80,24 +74,13 @@ hsqldb - org.testcontainers - testcontainers - ${test.containers.version} + org.springframework.cloud + spring-cloud-stream + ${spring.cloud.stream} + test-jar + test-binder test - - org.testcontainers - rabbitmq - ${test.rabbit.containers.version} - test - - - org.rnorth.duct-tape - duct-tape - ${test.ducttape.version} - test - - diff --git a/spring-cloud-task-samples/batch-events/src/main/java/io/spring/cloud/BatchEventsApplication.java b/spring-cloud-task-samples/batch-events/src/main/java/io/spring/cloud/BatchEventsApplication.java index 979b8024..c40c362f 100644 --- a/spring-cloud-task-samples/batch-events/src/main/java/io/spring/cloud/BatchEventsApplication.java +++ b/spring-cloud-task-samples/batch-events/src/main/java/io/spring/cloud/BatchEventsApplication.java @@ -47,7 +47,7 @@ public class BatchEventsApplication { SpringApplication.run(BatchEventsApplication.class, args); } - @Configuration(proxyBeanMethods = false) + @Configuration public static class JobConfiguration { private static final int DEFAULT_CHUNK_COUNT = 3; diff --git a/spring-cloud-task-samples/batch-events/src/test/java/io/spring/cloud/BatchEventsApplicationTests.java b/spring-cloud-task-samples/batch-events/src/test/java/io/spring/cloud/BatchEventsApplicationTests.java index 165e7295..2cf1cbcb 100644 --- a/spring-cloud-task-samples/batch-events/src/test/java/io/spring/cloud/BatchEventsApplicationTests.java +++ b/spring-cloud-task-samples/batch-events/src/test/java/io/spring/cloud/BatchEventsApplicationTests.java @@ -16,66 +16,92 @@ package io.spring.cloud; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; import org.assertj.core.api.BDDAssertions; import org.junit.jupiter.api.Tag; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.RabbitMQContainer; -import org.springframework.boot.SpringApplication; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.test.OutputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.PropertySource; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Import; +import org.springframework.messaging.Message; import static org.assertj.core.api.Assertions.assertThat; @Tag("DockerRequired") public class BatchEventsApplicationTests { + private static final String TASK_NAME = "taskEventTest"; - static { - GenericContainer rabbitmq = new RabbitMQContainer("rabbitmq:3.8.9") - .withExposedPorts(5672); - rabbitmq.start(); - final Integer mappedPort = rabbitmq.getMappedPort(5672); - System.setProperty("spring.rabbitmq.test.port", mappedPort.toString()); - rabbitPort = mappedPort.toString(); + private ConfigurableApplicationContext applicationContext; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private final TaskEventProperties taskEventProperties = new TaskEventProperties(); + + @BeforeEach + public void setup() { + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } - // Count for two job execution events per task - static CountDownLatch jobExecutionLatch = new CountDownLatch(2); - - private static String rabbitPort; - - @Test - public void testExecution() throws Exception { - SpringApplication - .run(JobExecutionListenerBinding.class, "--spring.main.web-environment=false"); - SpringApplication.run(BatchEventsApplication.class, "--server.port=0", - "--spring.cloud.stream.bindings.output.producer.requiredGroups=testgroup", - "--spring.jmx.default-domain=fakedomain", - "--spring.main.webEnvironment=false", - "--spring.rabbitmq.port=" + rabbitPort); - assertThat(jobExecutionLatch.await(60, TimeUnit.SECONDS)) - .as("The latch did not count down to zero before timeout").isTrue(); - } - - @EnableBinding(Sink.class) - @PropertySource("classpath:io/spring/task/listener/job-listener-sink-channel.properties") - @Configuration(proxyBeanMethods = false) - public static class JobExecutionListenerBinding { - - @StreamListener(Sink.INPUT) - public void receive(JobExecutionEvent execution) { - BDDAssertions.then(execution.getJobInstance().getJobName()) - .isEqualTo("job").as("Job name should be job"); - jobExecutionLatch.countDown(); + @AfterEach + public void tearDown() { + if (this.applicationContext != null && this.applicationContext.isActive()) { + this.applicationContext.close(); } } + @Test + public void testExecution() throws Exception { + List> result = testListener( + taskEventProperties.getJobExecutionEventBindingName(), 1); + JobExecutionEvent jobExecutionEvent = this.objectMapper.readValue(result.get(0).getPayload(), + JobExecutionEvent.class); + assertThat(jobExecutionEvent.getJobInstance().getJobName()) + .isEqualTo("job").as("Job name should be job"); + } + + private String[] getCommandLineParams(boolean enableFailJobConfig) { + String jobConfig = enableFailJobConfig ? + "--spring.cloud.task.test.enable-job-configuration=true" : + "--spring.cloud.task.test.enable-fail-job-configuration=true"; + return new String[]{"--spring.cloud.task.closecontext_enable=false", + "--spring.cloud.task.name=" + TASK_NAME, + "--spring.main.web-environment=false", + "--spring.cloud.stream.defaultBinder=rabbit", + "--spring.cloud.stream.bindings.task-events.destination=test", + jobConfig, + "foo=" + UUID.randomUUID()}; + } + + private List> testListener(String bindingName, int numberToRead) { + List> results = new ArrayList<>(); + this.applicationContext = new SpringApplicationBuilder() + .sources(TestChannelBinderConfiguration + .getCompleteConfiguration(BatchEventsTestApplication.class)).web(WebApplicationType.NONE).build() + .run(getCommandLineParams(true)); + OutputDestination target = this.applicationContext.getBean(OutputDestination.class); + for (int i = 0; i < numberToRead; i++) { + results.add(target.receive(10000, bindingName)); + } + return results; + } + + @SpringBootApplication + @Import({BatchEventsApplication.class}) + public static class BatchEventsTestApplication { + } + } diff --git a/spring-cloud-task-samples/taskprocessor/pom.xml b/spring-cloud-task-samples/taskprocessor/pom.xml index 8993e856..51e6b99a 100644 --- a/spring-cloud-task-samples/taskprocessor/pom.xml +++ b/spring-cloud-task-samples/taskprocessor/pom.xml @@ -58,13 +58,16 @@ spring-boot-configuration-processor - org.springframework.cloud - spring-cloud-stream-test-support + org.springframework.boot + spring-boot-starter-test test - org.springframework.boot - spring-boot-starter-test + org.springframework.cloud + spring-cloud-stream + ${spring-cloud-stream} + test-jar + test-binder test diff --git a/spring-cloud-task-samples/taskprocessor/src/main/java/io/spring/TaskProcessor.java b/spring-cloud-task-samples/taskprocessor/src/main/java/io/spring/TaskProcessor.java index 421c789f..1d02b1bb 100644 --- a/spring-cloud-task-samples/taskprocessor/src/main/java/io/spring/TaskProcessor.java +++ b/spring-cloud-task-samples/taskprocessor/src/main/java/io/spring/TaskProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,14 +18,15 @@ package io.spring; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.messaging.Processor; import org.springframework.cloud.task.launcher.TaskLaunchRequest; -import org.springframework.integration.annotation.Transformer; -import org.springframework.messaging.support.GenericMessage; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; import org.springframework.util.StringUtils; /** @@ -34,41 +35,43 @@ import org.springframework.util.StringUtils; * * @author Glenn Renfro */ -@EnableBinding(Processor.class) @EnableConfigurationProperties(TaskProcessorProperties.class) +@Configuration public class TaskProcessor { @Autowired private TaskProcessorProperties processorProperties; - @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) - public Object setupRequest(String message) { - Map properties = new HashMap<>(); - if (StringUtils.hasText(this.processorProperties.getDataSourceUrl())) { - properties - .put("spring_datasource_url", this.processorProperties - .getDataSourceUrl()); - } - if (StringUtils - .hasText(this.processorProperties.getDataSourceDriverClassName())) { - properties.put("spring_datasource_driverClassName", this.processorProperties - .getDataSourceDriverClassName()); - } - if (StringUtils.hasText(this.processorProperties.getDataSourceUserName())) { - properties.put("spring_datasource_username", this.processorProperties - .getDataSourceUserName()); - } - if (StringUtils.hasText(this.processorProperties.getDataSourcePassword())) { - properties.put("spring_datasource_password", this.processorProperties - .getDataSourcePassword()); - } - properties.put("payload", message); + @Bean + public Function, Message> processRequest() { + return (messagePayload) -> { + String message = messagePayload.getPayload(); + Map properties = new HashMap<>(); + if (StringUtils.hasText(this.processorProperties.getDataSourceUrl())) { + properties + .put("spring_datasource_url", this.processorProperties + .getDataSourceUrl()); + } + if (StringUtils + .hasText(this.processorProperties.getDataSourceDriverClassName())) { + properties.put("spring_datasource_driverClassName", this.processorProperties + .getDataSourceDriverClassName()); + } + if (StringUtils.hasText(this.processorProperties.getDataSourceUserName())) { + properties.put("spring_datasource_username", this.processorProperties + .getDataSourceUserName()); + } + if (StringUtils.hasText(this.processorProperties.getDataSourcePassword())) { + properties.put("spring_datasource_password", this.processorProperties + .getDataSourcePassword()); + } + properties.put("payload", message); - TaskLaunchRequest request = new TaskLaunchRequest( - this.processorProperties.getUri(), null, properties, null, - this.processorProperties.getApplicationName()); + TaskLaunchRequest request = new TaskLaunchRequest( + this.processorProperties.getUri(), null, properties, null, + this.processorProperties.getApplicationName()); - return new GenericMessage<>(request); + return MessageBuilder.withPayload(request).build(); + }; } - } diff --git a/spring-cloud-task-samples/taskprocessor/src/main/resources/application.properties b/spring-cloud-task-samples/taskprocessor/src/main/resources/application.properties new file mode 100644 index 00000000..15a0cafd --- /dev/null +++ b/spring-cloud-task-samples/taskprocessor/src/main/resources/application.properties @@ -0,0 +1,3 @@ +maven.remoteRepositories.springRepo.url=https://repo.spring.io/libs-snapshot +spring.cloud.stream.function.bindings.processRequest-in-0=input +spring.cloud.stream.function.bindings.processRequest-out-0=output diff --git a/spring-cloud-task-samples/taskprocessor/src/test/java/io/spring/TaskProcessorApplicationTests.java b/spring-cloud-task-samples/taskprocessor/src/test/java/io/spring/TaskProcessorApplicationTests.java index 7ff7acbe..d6caaa5c 100644 --- a/spring-cloud-task-samples/taskprocessor/src/test/java/io/spring/TaskProcessorApplicationTests.java +++ b/spring-cloud-task-samples/taskprocessor/src/test/java/io/spring/TaskProcessorApplicationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,57 +17,89 @@ package io.spring; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; + +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; - +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.stream.messaging.Processor; -import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.test.InputDestination; +import org.springframework.cloud.stream.binder.test.OutputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; import org.springframework.cloud.task.launcher.TaskLaunchRequest; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Import; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; -import org.springframework.test.context.junit.jupiter.SpringExtension; import static org.assertj.core.api.Assertions.assertThat; /** * @author Glenn Renfro */ -@ExtendWith(SpringExtension.class) -@SpringBootTest(classes = TaskProcessorApplication.class) + public class TaskProcessorApplicationTests { private static final String DEFAULT_PAYLOAD = "hello"; - @Autowired - protected Processor channels; + private final ObjectMapper objectMapper = new ObjectMapper(); + private ConfigurableApplicationContext applicationContext; - @Autowired - protected MessageCollector collector; + @BeforeEach + public void setup() { + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } - private ObjectMapper mapper = new ObjectMapper(); + @AfterEach + public void tearDown() { + if (this.applicationContext != null && this.applicationContext.isActive()) { + this.applicationContext.close(); + } + } @Test - public void test() throws InterruptedException, IOException { - this.channels.input().send(new GenericMessage(DEFAULT_PAYLOAD)); + public void test() throws IOException { Map properties = new HashMap(); properties.put("payload", DEFAULT_PAYLOAD); TaskLaunchRequest expectedRequest = new TaskLaunchRequest( "maven://org.springframework.cloud.task.app:" + "timestamp-task:jar:1.0.1.RELEASE", null, properties, null, null); - Message result = (Message) this.collector - .forChannel(this.channels.output()) - .take(); - TaskLaunchRequest tlq = this.mapper - .readValue(result.getPayload(), TaskLaunchRequest.class); + List> result = testListener("output", 1); + + TaskLaunchRequest tlq = objectMapper.readValue(result.get(0).getPayload(), TaskLaunchRequest.class); assertThat(tlq).isEqualTo(expectedRequest); } + + private List> testListener(String bindingName, int numberToRead) { + List> results = new ArrayList<>(); + this.applicationContext = new SpringApplicationBuilder() + .sources(TestChannelBinderConfiguration + .getCompleteConfiguration(TaskProcessorTestApplication.class)).web(WebApplicationType.NONE) + .run(); + + InputDestination input = this.applicationContext.getBean(InputDestination.class); + OutputDestination target = this.applicationContext.getBean(OutputDestination.class); + input.send(new GenericMessage<>(DEFAULT_PAYLOAD.getBytes(StandardCharsets.UTF_8))); + for (int i = 0; i < numberToRead; i++) { + results.add(target.receive(10000, bindingName)); + } + return results; + } + + @SpringBootApplication + @Import({TaskProcessor.class}) + public static class TaskProcessorTestApplication { + } } diff --git a/spring-cloud-task-samples/tasksink/pom.xml b/spring-cloud-task-samples/tasksink/pom.xml index 8d4dd942..72416758 100644 --- a/spring-cloud-task-samples/tasksink/pom.xml +++ b/spring-cloud-task-samples/tasksink/pom.xml @@ -23,6 +23,7 @@ UTF-8 true 3.2.0-SNAPSHOT + 2.7.0 @@ -55,13 +56,21 @@ compile - org.springframework.cloud - spring-cloud-stream-test-support + org.springframework.boot + spring-boot-starter-test test - org.springframework.boot - spring-boot-starter-test + org.springframework.cloud + spring-cloud-deployer-local + ${spring-cloud-deployer-version} + + + org.springframework.cloud + spring-cloud-stream + ${spring-cloud-stream} + test-jar + test-binder test diff --git a/spring-cloud-task-samples/tasksink/src/main/resources/application.properties b/spring-cloud-task-samples/tasksink/src/main/resources/application.properties index a2e7d812..884f00e4 100644 --- a/spring-cloud-task-samples/tasksink/src/main/resources/application.properties +++ b/spring-cloud-task-samples/tasksink/src/main/resources/application.properties @@ -1 +1,4 @@ maven.remoteRepositories.springRepo.url=https://repo.spring.io/libs-snapshot +spring.cloud.stream.function.bindings.taskLauncherSink-in-0=input +spring.cloud.stream.bindings.input.destination=taskLauncherSinkExchange + diff --git a/spring-cloud-task-samples/tasksink/src/test/java/io/spring/TaskSinkApplicationTests.java b/spring-cloud-task-samples/tasksink/src/test/java/io/spring/TaskSinkApplicationTests.java index 4dc1f3e7..1b9b38a4 100644 --- a/spring-cloud-task-samples/tasksink/src/test/java/io/spring/TaskSinkApplicationTests.java +++ b/spring-cloud-task-samples/tasksink/src/test/java/io/spring/TaskSinkApplicationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package io.spring; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -29,7 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest; import org.springframework.cloud.deployer.spi.task.TaskLauncher; -import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.cloud.task.launcher.TaskLaunchRequest; import org.springframework.context.ApplicationContext; import org.springframework.messaging.support.GenericMessage; @@ -49,11 +48,10 @@ public class TaskSinkApplicationTests { ApplicationContext context; @Autowired - private Sink sink; + private StreamBridge streamBridge; @Test - public void testLaunch() throws IOException { - assertThat(this.sink.input()).isNotNull(); + public void testLaunch() { TaskLauncher testTaskLauncher = this.context.getBean(TaskLauncher.class); @@ -65,8 +63,7 @@ public class TaskSinkApplicationTests { + "timestamp-task:jar:1.0.1.RELEASE", null, properties, null, null); GenericMessage message = new GenericMessage<>(request); - this.sink.input().send(message); - + this.streamBridge.send("taskLauncherSink-in-0", message); ArgumentCaptor deploymentRequest = ArgumentCaptor .forClass(AppDeploymentRequest.class); diff --git a/spring-cloud-task-stream/pom.xml b/spring-cloud-task-stream/pom.xml index 5aea6aca..4eb65bcb 100644 --- a/spring-cloud-task-stream/pom.xml +++ b/spring-cloud-task-stream/pom.xml @@ -58,21 +58,11 @@ spring-boot-starter test - - org.springframework.cloud - spring-cloud-stream-test-support-internal - test - org.springframework.cloud spring-cloud-starter-stream-rabbit test - - org.springframework.cloud - spring-cloud-stream-test-support - test - org.springframework.cloud spring-cloud-task-batch @@ -98,5 +88,18 @@ spring-boot-autoconfigure-processor true + + org.springframework.cloud + spring-cloud-stream + ${spring-cloud-stream.version} + test-jar + test-binder + test + + + org.assertj + assertj-core + test + diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/BatchEventAutoConfiguration.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/BatchEventAutoConfiguration.java index 5981bc62..7c499f8a 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/BatchEventAutoConfiguration.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/BatchEventAutoConfiguration.java @@ -32,8 +32,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.cloud.task.batch.listener.support.MessagePublisher; import org.springframework.cloud.task.batch.listener.support.TaskBatchEventListenerBeanPostProcessor; import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; import org.springframework.cloud.task.configuration.SimpleTaskAutoConfiguration; @@ -41,7 +41,6 @@ import org.springframework.cloud.task.listener.TaskLifecycleListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; -import org.springframework.messaging.MessageChannel; /** * Autoconfigures Spring Batch listeners designed to emit events on the following @@ -112,82 +111,16 @@ public class BatchEventAutoConfiguration { return new TaskBatchEventListenerBeanPostProcessor(); } - /** - * Name of Batch Events channels. - */ - public interface BatchEventsChannels { - - /** - * Name of the job execution events channel. - */ - String JOB_EXECUTION_EVENTS = "job-execution-events"; - - /** - * Name of the step execution events channel. - */ - String STEP_EXECUTION_EVENTS = "step-execution-events"; - - /** - * Name of the chunk execution events channel. - */ - String CHUNK_EXECUTION_EVENTS = "chunk-events"; - - /** - * Name of the item read events channel. - */ - String ITEM_READ_EVENTS = "item-read-events"; - - /** - * Name of the item process events channel. - */ - String ITEM_PROCESS_EVENTS = "item-process-events"; - - /** - * Name of the item write events channel. - */ - String ITEM_WRITE_EVENTS = "item-write-events"; - - /** - * Name of the skip events channel. - */ - String SKIP_EVENTS = "skip-events"; - - @Output(JOB_EXECUTION_EVENTS) - MessageChannel jobExecutionEvents(); - - @Output(STEP_EXECUTION_EVENTS) - MessageChannel stepExecutionEvents(); - - @Output(CHUNK_EXECUTION_EVENTS) - MessageChannel chunkEvents(); - - @Output(ITEM_READ_EVENTS) - MessageChannel itemReadEvents(); - - @Output(ITEM_WRITE_EVENTS) - MessageChannel itemWriteEvents(); - - @Output(ITEM_PROCESS_EVENTS) - MessageChannel itemProcessEvents(); - - @Output(SKIP_EVENTS) - MessageChannel skipEvents(); - - } - /** * Configuration for Job Execution Listener. */ @Configuration(proxyBeanMethods = false) - @ConditionalOnClass(EnableBinding.class) - @EnableBinding(BatchEventsChannels.class) + @ConditionalOnClass(StreamBridge.class) @EnableConfigurationProperties(TaskEventProperties.class) @ConditionalOnMissingBean(name = JOB_EXECUTION_EVENTS_LISTENER) @ConditionalOnExpression("T(org.springframework.util.StringUtils).isEmpty('${spring.batch.job.jobName:}')") public static class JobExecutionListenerConfiguration { - @Autowired - private BatchEventsChannels listenerChannels; @Autowired private TaskEventProperties taskEventProperties; @@ -198,10 +131,10 @@ public class BatchEventAutoConfiguration { @ConditionalOnProperty(prefix = "spring.cloud.task.batch.events.job-execution", name = "enabled", havingValue = "true", matchIfMissing = true) // @checkstyle:on - public JobExecutionListener jobExecutionEventsListener() { + public JobExecutionListener jobExecutionEventsListener(MessagePublisher messagePublisher, TaskEventProperties properties) { return new EventEmittingJobExecutionListener( - this.listenerChannels.jobExecutionEvents(), - this.taskEventProperties.getJobExecutionOrder()); + messagePublisher, + this.taskEventProperties.getJobExecutionOrder(), properties); } // @checkstyle:off @@ -209,10 +142,10 @@ public class BatchEventAutoConfiguration { @ConditionalOnProperty(prefix = "spring.cloud.task.batch.events.step-execution", name = "enabled", havingValue = "true", matchIfMissing = true) // @checkstyle:on - public StepExecutionListener stepExecutionEventsListener() { + public StepExecutionListener stepExecutionEventsListener(MessagePublisher messagePublisher, TaskEventProperties properties) { return new EventEmittingStepExecutionListener( - this.listenerChannels.stepExecutionEvents(), - this.taskEventProperties.getStepExecutionOrder()); + messagePublisher, + this.taskEventProperties.getStepExecutionOrder(), properties); } // @checkstyle:off @@ -221,9 +154,9 @@ public class BatchEventAutoConfiguration { @ConditionalOnProperty(prefix = "spring.cloud.task.batch.events.chunk", name = "enabled", havingValue = "true", matchIfMissing = true) // @checkstyle:on - public EventEmittingChunkListener chunkEventsListener() { - return new EventEmittingChunkListener(this.listenerChannels.chunkEvents(), - this.taskEventProperties.getChunkOrder()); + public EventEmittingChunkListener chunkEventsListener(MessagePublisher messagePublisher, TaskEventProperties properties) { + return new EventEmittingChunkListener(messagePublisher, + this.taskEventProperties.getChunkOrder(), properties); } // @checkstyle:off @@ -231,10 +164,10 @@ public class BatchEventAutoConfiguration { @ConditionalOnProperty(prefix = "spring.cloud.task.batch.events.item-read", name = "enabled", havingValue = "true", matchIfMissing = true) // @checkstyle:on - public ItemReadListener itemReadEventsListener() { + public ItemReadListener itemReadEventsListener(MessagePublisher messagePublisher, TaskEventProperties properties) { return new EventEmittingItemReadListener( - this.listenerChannels.itemReadEvents(), - this.taskEventProperties.getItemReadOrder()); + messagePublisher, + this.taskEventProperties.getItemReadOrder(), properties); } // @checkstyle:off @@ -242,10 +175,11 @@ public class BatchEventAutoConfiguration { @ConditionalOnProperty(prefix = "spring.cloud.task.batch.events.item-write", name = "enabled", havingValue = "true", matchIfMissing = true) // @checkstyle:on - public ItemWriteListener itemWriteEventsListener() { + public ItemWriteListener itemWriteEventsListener(MessagePublisher messagePublisher, + TaskEventProperties properties) { return new EventEmittingItemWriteListener( - this.listenerChannels.itemWriteEvents(), - this.taskEventProperties.getItemWriteOrder()); + messagePublisher, + this.taskEventProperties.getItemWriteOrder(), properties); } // @checkstyle:off @@ -253,10 +187,11 @@ public class BatchEventAutoConfiguration { @ConditionalOnProperty(prefix = "spring.cloud.task.batch.events.item-process", name = "enabled", havingValue = "true", matchIfMissing = true) // @checkstyle:on - public ItemProcessListener itemProcessEventsListener() { + public ItemProcessListener itemProcessEventsListener(MessagePublisher messagePublisher, + TaskEventProperties properties) { return new EventEmittingItemProcessListener( - this.listenerChannels.itemProcessEvents(), - this.taskEventProperties.getItemProcessOrder()); + messagePublisher, + this.taskEventProperties.getItemProcessOrder(), properties); } // @checkstyle:off @@ -264,9 +199,15 @@ public class BatchEventAutoConfiguration { @ConditionalOnProperty(prefix = "spring.cloud.task.batch.events.skip", name = "enabled", havingValue = "true", matchIfMissing = true) // @checkstyle:on - public SkipListener skipEventsListener() { - return new EventEmittingSkipListener(this.listenerChannels.skipEvents(), - this.taskEventProperties.getItemProcessOrder()); + public SkipListener skipEventsListener(MessagePublisher messagePublisher, + TaskEventProperties properties) { + return new EventEmittingSkipListener(messagePublisher, + this.taskEventProperties.getItemProcessOrder(), properties); + } + + @Bean + public MessagePublisher messagePublisher(StreamBridge streamBridge) { + return new MessagePublisher(streamBridge); } } diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingChunkListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingChunkListener.java index 7125d628..b794d97b 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingChunkListener.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingChunkListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,11 @@ package org.springframework.cloud.task.batch.listener; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import org.springframework.batch.core.ChunkListener; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; import org.springframework.core.Ordered; -import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** @@ -38,30 +35,34 @@ import org.springframework.util.Assert; */ public class EventEmittingChunkListener implements ChunkListener, Ordered { - private static final Log logger = LogFactory.getLog(EventEmittingChunkListener.class); - - private MessagePublisher messagePublisher; - private int order = Ordered.LOWEST_PRECEDENCE; - public EventEmittingChunkListener(MessageChannel output) { - Assert.notNull(output, "An output channel is required"); - this.messagePublisher = new MessagePublisher(output); + private MessagePublisher messagePublisher; + + private TaskEventProperties properties; + + public EventEmittingChunkListener(MessagePublisher messagePublisher, TaskEventProperties properties) { + Assert.notNull(messagePublisher, "messagePublisher is required"); + Assert.notNull(properties, "properties is required"); + this.messagePublisher = messagePublisher; + this.properties = properties; } - public EventEmittingChunkListener(MessageChannel output, int order) { - this(output); + public EventEmittingChunkListener(MessagePublisher messagePublisher, int order, TaskEventProperties properties) { + this(messagePublisher, properties); this.order = order; } @Override public void beforeChunk(ChunkContext context) { - this.messagePublisher.publish("Before Chunk Processing"); + this.messagePublisher.publish(this.properties.getChunkEventBindingName(), + "Before Chunk Processing"); } @Override public void afterChunk(ChunkContext context) { - this.messagePublisher.publish("After Chunk Processing"); + this.messagePublisher.publish(this.properties.getChunkEventBindingName(), + "After Chunk Processing"); } @Override diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemProcessListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemProcessListener.java index aab7fb3d..f78719e0 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemProcessListener.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemProcessListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,15 @@ package org.springframework.cloud.task.batch.listener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.springframework.batch.core.ItemProcessListener; import org.springframework.batch.item.ItemProcessor; import org.springframework.cloud.task.batch.listener.support.BatchJobHeaders; import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; import org.springframework.core.Ordered; -import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** @@ -40,17 +43,24 @@ import org.springframework.util.Assert; */ public class EventEmittingItemProcessListener implements ItemProcessListener, Ordered { - private MessagePublisher messagePublisher; + private static final Log logger = LogFactory + .getLog(EventEmittingItemProcessListener.class); + + private MessagePublisher messagePublisher; private int order = Ordered.LOWEST_PRECEDENCE; - public EventEmittingItemProcessListener(MessageChannel output) { - Assert.notNull(output, "An output channel is required"); - this.messagePublisher = new MessagePublisher<>(output); + private TaskEventProperties properties; + + public EventEmittingItemProcessListener(MessagePublisher messagePublisher, TaskEventProperties properties) { + Assert.notNull(messagePublisher, "messagePublisher is required"); + Assert.notNull(properties, "properties is required"); + this.messagePublisher = messagePublisher; + this.properties = properties; } - public EventEmittingItemProcessListener(MessageChannel output, int order) { - this(output); + public EventEmittingItemProcessListener(MessagePublisher messagePublisher, int order, TaskEventProperties properties) { + this(messagePublisher, properties); this.order = order; } @@ -61,20 +71,24 @@ public class EventEmittingItemProcessListener implements ItemProcessListener, Or @Override public void afterProcess(Object item, Object result) { if (result == null) { - this.messagePublisher.publish("1 item was filtered"); + this.messagePublisher.publish(this.properties.getItemProcessEventBindingName(), "1 item was filtered"); } else if (item.equals(result)) { - this.messagePublisher.publish("item equaled result after processing"); + this.messagePublisher.publish(this.properties.getItemProcessEventBindingName(), "item equaled result after processing"); } else { - this.messagePublisher.publish("item did not equal result after processing"); + this.messagePublisher.publish(this.properties.getItemProcessEventBindingName(), "item did not equal result after processing"); } } @Override public void onProcessError(Object item, Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Executing onProcessError: " + e.getMessage(), e); + } this.messagePublisher.publishWithThrowableHeader( - "Exception while item was being processed", e.getMessage()); + this.properties.getItemProcessEventBindingName(), + "Exception while item was being processed", e.getMessage()); } @Override diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemReadListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemReadListener.java index 48ff29ee..ec51e21d 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemReadListener.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemReadListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,8 @@ import org.springframework.batch.core.ItemReadListener; import org.springframework.batch.item.ItemReader; import org.springframework.cloud.task.batch.listener.support.BatchJobHeaders; import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; import org.springframework.core.Ordered; -import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** @@ -43,17 +43,22 @@ public class EventEmittingItemReadListener implements ItemReadListener, Ordered private static final Log logger = LogFactory .getLog(EventEmittingItemReadListener.class); - private MessagePublisher messagePublisher; - private int order = Ordered.LOWEST_PRECEDENCE; - public EventEmittingItemReadListener(MessageChannel output) { - Assert.notNull(output, "An output channel is required"); - this.messagePublisher = new MessagePublisher(output); + private final MessagePublisher messagePublisher; + + private TaskEventProperties properties; + + public EventEmittingItemReadListener(MessagePublisher messagePublisher, TaskEventProperties properties) { + Assert.notNull(messagePublisher, "messagePublisher is required"); + Assert.notNull(properties, "properties is required"); + this.properties = properties; + this.messagePublisher = messagePublisher; } - public EventEmittingItemReadListener(MessageChannel output, int order) { - this(output); + public EventEmittingItemReadListener(MessagePublisher messagePublisher, + int order, TaskEventProperties properties) { + this(messagePublisher, properties); this.order = order; } @@ -73,7 +78,7 @@ public class EventEmittingItemReadListener implements ItemReadListener, Ordered logger.debug("Executing onReadError: " + ex.getMessage(), ex); } - this.messagePublisher.publishWithThrowableHeader( + this.messagePublisher.publishWithThrowableHeader(this.properties.getItemReadEventBindingName(), "Exception while item was being read", ex.getMessage()); } diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemWriteListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemWriteListener.java index b9c97f23..d962a0f8 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemWriteListener.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingItemWriteListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,8 +24,8 @@ import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.ItemWriteListener; import org.springframework.cloud.task.batch.listener.support.BatchJobHeaders; import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; import org.springframework.core.Ordered; -import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** @@ -44,23 +44,29 @@ public class EventEmittingItemWriteListener implements ItemWriteListener, Ordere private static final Log logger = LogFactory .getLog(EventEmittingItemWriteListener.class); - private MessagePublisher messagePublisher; - private int order = Ordered.LOWEST_PRECEDENCE; - public EventEmittingItemWriteListener(MessageChannel output) { - Assert.notNull(output, "An output channel is required"); - this.messagePublisher = new MessagePublisher<>(output); + private final MessagePublisher messagePublisher; + + private TaskEventProperties properties; + + public EventEmittingItemWriteListener(MessagePublisher messagePublisher, TaskEventProperties properties) { + Assert.notNull(messagePublisher, "messagePublisher is required"); + Assert.notNull(properties, "properties is required"); + + this.messagePublisher = messagePublisher; + this.properties = properties; } - public EventEmittingItemWriteListener(MessageChannel output, int order) { - this(output); + public EventEmittingItemWriteListener(MessagePublisher messagePublisher, int order, TaskEventProperties properties) { + this(messagePublisher, properties); this.order = order; } @Override public void beforeWrite(List items) { - this.messagePublisher.publish(items.size() + " items to be written."); + this.messagePublisher.publish(this.properties.getItemWriteEventBindingName(), + items.size() + " items to be written."); } @Override @@ -68,7 +74,8 @@ public class EventEmittingItemWriteListener implements ItemWriteListener, Ordere if (logger.isDebugEnabled()) { logger.debug("Executing afterWrite: " + items); } - this.messagePublisher.publish(items.size() + " items have been written."); + this.messagePublisher.publish(this.properties.getItemWriteEventBindingName(), + items.size() + " items have been written."); } @Override @@ -78,7 +85,8 @@ public class EventEmittingItemWriteListener implements ItemWriteListener, Ordere } String payload = "Exception while " + items.size() + " items are attempted to be written."; - this.messagePublisher.publishWithThrowableHeader(payload, exception.getMessage()); + this.messagePublisher.publishWithThrowableHeader( + this.properties.getItemWriteEventBindingName(), payload, exception.getMessage()); } @Override diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingJobExecutionListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingJobExecutionListener.java index 46515caa..26561885 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingJobExecutionListener.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingJobExecutionListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,8 @@ import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent; import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; import org.springframework.core.Ordered; -import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** @@ -33,28 +33,33 @@ import org.springframework.util.Assert; */ public class EventEmittingJobExecutionListener implements JobExecutionListener, Ordered { - private MessagePublisher messagePublisher; - private int order = Ordered.LOWEST_PRECEDENCE; - public EventEmittingJobExecutionListener(MessageChannel output) { - Assert.notNull(output, "An output channel is required"); - this.messagePublisher = new MessagePublisher<>(output); + private final MessagePublisher messagePublisher; + + private TaskEventProperties properties; + + public EventEmittingJobExecutionListener(MessagePublisher messagePublisher, TaskEventProperties properties) { + Assert.notNull(messagePublisher, "messagePublisher is required"); + Assert.notNull(properties, "properties is required"); + + this.messagePublisher = messagePublisher; + this.properties = properties; } - public EventEmittingJobExecutionListener(MessageChannel output, int order) { - this(output); + public EventEmittingJobExecutionListener(MessagePublisher messagePublisher, int order, TaskEventProperties properties) { + this(messagePublisher, properties); this.order = order; } @Override public void beforeJob(JobExecution jobExecution) { - this.messagePublisher.publish(new JobExecutionEvent(jobExecution)); + this.messagePublisher.publish(properties.getJobExecutionEventBindingName(), new JobExecutionEvent(jobExecution)); } @Override public void afterJob(JobExecution jobExecution) { - this.messagePublisher.publish(new JobExecutionEvent(jobExecution)); + this.messagePublisher.publish(properties.getJobExecutionEventBindingName(), new JobExecutionEvent(jobExecution)); } @Override diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingSkipListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingSkipListener.java index d066bf8b..e05336e8 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingSkipListener.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingSkipListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,8 @@ import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.SkipListener; import org.springframework.cloud.task.batch.listener.support.BatchJobHeaders; import org.springframework.cloud.task.batch.listener.support.MessagePublisher; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; import org.springframework.core.Ordered; -import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** @@ -43,17 +43,22 @@ public class EventEmittingSkipListener implements SkipListener, Ordered { private static final Log logger = LogFactory.getLog(EventEmittingSkipListener.class); - private MessagePublisher messagePublisher; + private final MessagePublisher messagePublisher; private int order = Ordered.LOWEST_PRECEDENCE; - public EventEmittingSkipListener(MessageChannel output) { - Assert.notNull(output, "An output channel is required"); - this.messagePublisher = new MessagePublisher<>(output); + private TaskEventProperties properties; + + public EventEmittingSkipListener(MessagePublisher messagePublisher, TaskEventProperties properties) { + Assert.notNull(messagePublisher, "messagePublisher is required"); + Assert.notNull(properties, "properties is required"); + + this.messagePublisher = messagePublisher; + this.properties = properties; } - public EventEmittingSkipListener(MessageChannel output, int order) { - this(output); + public EventEmittingSkipListener(MessagePublisher messagePublisher, int order, TaskEventProperties properties) { + this(messagePublisher, properties); this.order = order; } @@ -62,8 +67,7 @@ public class EventEmittingSkipListener implements SkipListener, Ordered { if (logger.isDebugEnabled()) { logger.debug("Executing onSkipInRead: " + t.getMessage(), t); } - this.messagePublisher.publishWithThrowableHeader("Skipped when reading.", - t.getMessage()); + this.messagePublisher.publishWithThrowableHeader(this.properties.getSkipEventBindingName(), "Skipped when reading.", t.getMessage()); } @Override @@ -71,7 +75,7 @@ public class EventEmittingSkipListener implements SkipListener, Ordered { if (logger.isDebugEnabled()) { logger.debug("Executing onSkipInWrite: " + t.getMessage(), t); } - this.messagePublisher.publishWithThrowableHeader(item, t.getMessage()); + this.messagePublisher.publishWithThrowableHeader(this.properties.getSkipEventBindingName(), item, t.getMessage()); } @Override @@ -79,7 +83,7 @@ public class EventEmittingSkipListener implements SkipListener, Ordered { if (logger.isDebugEnabled()) { logger.debug("Executing onSkipInProcess: " + t.getMessage(), t); } - this.messagePublisher.publishWithThrowableHeader(item, t.getMessage()); + this.messagePublisher.publishWithThrowableHeader(this.properties.getSkipEventBindingName(), item, t.getMessage()); } @Override diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingStepExecutionListener.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingStepExecutionListener.java index 7059be6a..40611534 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingStepExecutionListener.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/EventEmittingStepExecutionListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.cloud.task.batch.listener.support.MessagePublisher; import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; import org.springframework.core.Ordered; -import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** @@ -37,28 +37,32 @@ import org.springframework.util.Assert; public class EventEmittingStepExecutionListener implements StepExecutionListener, Ordered { - private MessagePublisher messagePublisher; - + private final MessagePublisher messagePublisher; private int order = Ordered.LOWEST_PRECEDENCE; - public EventEmittingStepExecutionListener(MessageChannel output) { - Assert.notNull(output, "An output channel is required"); - this.messagePublisher = new MessagePublisher<>(output); + private TaskEventProperties properties; + + public EventEmittingStepExecutionListener(MessagePublisher messagePublisher, TaskEventProperties properties) { + Assert.notNull(messagePublisher, "messagePublisher is required"); + Assert.notNull(properties, "properties is required"); + + this.messagePublisher = messagePublisher; + this.properties = properties; } - public EventEmittingStepExecutionListener(MessageChannel output, int order) { - this(output); + public EventEmittingStepExecutionListener(MessagePublisher messagePublisher, int order, TaskEventProperties properties) { + this(messagePublisher, properties); this.order = order; } @Override public void beforeStep(StepExecution stepExecution) { - this.messagePublisher.publish(new StepExecutionEvent(stepExecution)); + this.messagePublisher.publish(this.properties.getStepExecutionEventBindingName(), new StepExecutionEvent(stepExecution)); } @Override public ExitStatus afterStep(StepExecution stepExecution) { - this.messagePublisher.publish(new StepExecutionEvent(stepExecution)); + this.messagePublisher.publish(this.properties.getStepExecutionEventBindingName(), new StepExecutionEvent(stepExecution)); return stepExecution.getExitStatus(); } diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/MessagePublisher.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/MessagePublisher.java index 2734642e..d500f785 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/MessagePublisher.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/MessagePublisher.java @@ -16,8 +16,8 @@ package org.springframework.cloud.task.batch.listener.support; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -29,31 +29,32 @@ import org.springframework.util.Assert; */ public class MessagePublisher

{ - private final MessageChannel listenerEventsChannel; - public MessagePublisher(MessageChannel listenerEventsChannel) { - Assert.notNull(listenerEventsChannel, "listenerEventsChannel must not be null"); - this.listenerEventsChannel = listenerEventsChannel; + private final StreamBridge streamBridge; + + public MessagePublisher(StreamBridge streamBridge) { + Assert.notNull(streamBridge, "streamBridge must not be null"); + this.streamBridge = streamBridge; } - public final void publish(P payload) { + public final void publish(String bindingName, P payload) { if (payload instanceof Message) { - this.publishMessage((Message) payload); + this.publishMessage(bindingName, (Message) payload); } else { Message

message = MessageBuilder.withPayload(payload).build(); - this.listenerEventsChannel.send(message); + this.streamBridge.send(bindingName, message); } } - private void publishMessage(Message message) { - this.listenerEventsChannel.send(message); + private void publishMessage(String bindingName, Message message) { + this.streamBridge.send(bindingName, message); } - public void publishWithThrowableHeader(P payload, String header) { + public void publishWithThrowableHeader(String bindingName, P payload, String header) { Message

message = MessageBuilder.withPayload(payload) .setHeader(BatchJobHeaders.BATCH_EXCEPTION, header).build(); - publishMessage(message); + publishMessage(bindingName, message); } } diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/TaskEventProperties.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/TaskEventProperties.java index 18d95aa8..ea77d0ad 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/TaskEventProperties.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/batch/listener/support/TaskEventProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,6 +67,15 @@ public class TaskEventProperties { */ private int skipOrder = Ordered.LOWEST_PRECEDENCE; + private String jobExecutionEventBindingName = "job-execution-events"; + private String skipEventBindingName = "skip-events"; + private String chunkEventBindingName = "chunk-events"; + private String itemProcessEventBindingName = "item-process-events"; + private String itemReadEventBindingName = "item-read-events"; + private String itemWriteEventBindingName = "item-write-events"; + private String stepExecutionEventBindingName = "step-execution-events"; + private String taskEventBindingName = "task-events"; + public int getJobExecutionOrder() { return this.jobExecutionOrder; } @@ -123,4 +132,68 @@ public class TaskEventProperties { this.skipOrder = skipOrder; } + + public String getJobExecutionEventBindingName() { + return jobExecutionEventBindingName; + } + + public void setJobExecutionEventBindingName(String jobExecutionEventBindingName) { + this.jobExecutionEventBindingName = jobExecutionEventBindingName; + } + + public String getSkipEventBindingName() { + return skipEventBindingName; + } + + public void setSkipEventBindingName(String skipEventBindingName) { + this.skipEventBindingName = skipEventBindingName; + } + + public String getChunkEventBindingName() { + return chunkEventBindingName; + } + + public void setChunkEventBindingName(String chunkEventBindingName) { + this.chunkEventBindingName = chunkEventBindingName; + } + + public String getItemProcessEventBindingName() { + return itemProcessEventBindingName; + } + + public void setItemProcessEventBindingName(String itemProcessEventBindingName) { + this.itemProcessEventBindingName = itemProcessEventBindingName; + } + + public String getItemReadEventBindingName() { + return itemReadEventBindingName; + } + + public void setItemReadEventBindingName(String itemReadEventBindingName) { + this.itemReadEventBindingName = itemReadEventBindingName; + } + + public String getItemWriteEventBindingName() { + return itemWriteEventBindingName; + } + + public void setItemWriteEventBindingName(String itemWriteEventBindingName) { + this.itemWriteEventBindingName = itemWriteEventBindingName; + } + + public String getStepExecutionEventBindingName() { + return stepExecutionEventBindingName; + } + + public void setStepExecutionEventBindingName(String stepExecutionEventBindingName) { + this.stepExecutionEventBindingName = stepExecutionEventBindingName; + } + + public String getTaskEventBindingName() { + return taskEventBindingName; + } + + public void setTaskEventBindingName(String taskEventBindingName) { + this.taskEventBindingName = taskEventBindingName; + } } diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/launcher/TaskLauncherSink.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/launcher/TaskLauncherSink.java index 3ce3469a..82f46881 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/launcher/TaskLauncherSink.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/launcher/TaskLauncherSink.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.cloud.task.launcher; +import java.util.function.Consumer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,10 +26,9 @@ import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoa import org.springframework.cloud.deployer.spi.core.AppDefinition; import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest; import org.springframework.cloud.deployer.spi.task.TaskLauncher; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.context.annotation.Bean; import org.springframework.core.io.Resource; -import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.messaging.Message; import org.springframework.util.Assert; /** @@ -36,7 +37,6 @@ import org.springframework.util.Assert; * @author Glenn Renfro */ -@EnableBinding(Sink.class) public class TaskLauncherSink { private final static Logger logger = LoggerFactory.getLogger(TaskLauncherSink.class); @@ -52,13 +52,13 @@ public class TaskLauncherSink { /** * Launches a task upon the receipt of a valid TaskLaunchRequest. - * @param taskLaunchRequest is a TaskLaunchRequest containing the information required - * to launch a task. - * @throws Exception if error occurs during task launch. + * @return the {@link Consumer} that will retrieve messages from binder. */ - @ServiceActivator(inputChannel = Sink.INPUT) - public void taskLauncherSink(TaskLaunchRequest taskLaunchRequest) throws Exception { - launchTask(taskLaunchRequest); + @Bean + public Consumer> taskLauncherSink() { + return messagePayload -> { + launchTask(messagePayload.getPayload()); + }; } private void launchTask(TaskLaunchRequest taskLaunchRequest) { diff --git a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/listener/TaskEventAutoConfiguration.java b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/listener/TaskEventAutoConfiguration.java index f6d7526c..7a61dad8 100644 --- a/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/listener/TaskEventAutoConfiguration.java +++ b/spring-cloud-task-stream/src/main/java/org/springframework/cloud/task/listener/TaskEventAutoConfiguration.java @@ -22,22 +22,22 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Output; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.config.BindingServiceConfiguration; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; import org.springframework.cloud.task.configuration.SimpleTaskAutoConfiguration; +import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; -import org.springframework.integration.gateway.GatewayProxyFactoryBean; -import org.springframework.messaging.MessageChannel; /** * @author Michael Minella * @author Glenn Renfro */ @Configuration(proxyBeanMethods = false) -@ConditionalOnClass(EnableBinding.class) +@ConditionalOnClass(StreamBridge.class) @ConditionalOnBean(TaskLifecycleListener.class) @ConditionalOnExpression("T(org.springframework.util.StringUtils).isEmpty('${spring.batch.job.jobName:}')") // @checkstyle:off @@ -47,38 +47,32 @@ import org.springframework.messaging.MessageChannel; @PropertySource("classpath:/org/springframework/cloud/task/application.properties") @AutoConfigureBefore(BindingServiceConfiguration.class) @AutoConfigureAfter(SimpleTaskAutoConfiguration.class) +@EnableConfigurationProperties(TaskEventProperties.class) public class TaskEventAutoConfiguration { - /** - * Task Event channels definition. - */ - public interface TaskEventChannels { - - /** - * Name of the default task events channel. - */ - String TASK_EVENTS = "task-events"; - - @Output(TASK_EVENTS) - MessageChannel taskEvents(); - - } - /** * Configuration for a {@link TaskExecutionListener}. */ @Configuration(proxyBeanMethods = false) - @EnableBinding(TaskEventChannels.class) public static class ListenerConfiguration { - @Bean - public GatewayProxyFactoryBean taskEventListener() { - GatewayProxyFactoryBean factoryBean = new GatewayProxyFactoryBean( - TaskExecutionListener.class); + public TaskExecutionListener taskEventEmitter(StreamBridge streamBridge, TaskEventProperties taskEventProperties) { + return new TaskExecutionListener() { + @Override + public void onTaskStartup(TaskExecution taskExecution) { + streamBridge.send(taskEventProperties.getTaskEventBindingName(), taskExecution); + } - factoryBean.setDefaultRequestChannelName(TaskEventChannels.TASK_EVENTS); + @Override + public void onTaskEnd(TaskExecution taskExecution) { + streamBridge.send(taskEventProperties.getTaskEventBindingName(), taskExecution); + } - return factoryBean; + @Override + public void onTaskFailed(TaskExecution taskExecution, Throwable throwable) { + streamBridge.send(taskEventProperties.getTaskEventBindingName(), taskExecution); + } + }; } } diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventListenerTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventListenerTests.java index e489fbfb..4ec7a9b4 100644 --- a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventListenerTests.java +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/EventListenerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,14 @@ package org.springframework.cloud.task.batch.listener; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -29,10 +33,18 @@ import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.scope.context.StepContext; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.test.OutputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent; +import org.springframework.cloud.task.batch.listener.support.MessagePublisher; import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent; +import org.springframework.cloud.task.batch.listener.support.TaskEventProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.Ordered; -import org.springframework.integration.channel.QueueChannel; import org.springframework.messaging.Message; import static org.assertj.core.api.Assertions.assertThat; @@ -43,8 +55,6 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class EventListenerTests { - private QueueChannel queueChannel; - private EventEmittingSkipListener eventEmittingSkipListener; private EventEmittingItemProcessListener eventEmittingItemProcessListener; @@ -59,22 +69,43 @@ public class EventListenerTests { private EventEmittingChunkListener eventEmittingChunkListener; + private ConfigurableApplicationContext applicationContext; + + private final TaskEventProperties taskEventProperties = new TaskEventProperties(); + + private final ObjectMapper objectMapper = new ObjectMapper(); + @BeforeEach public void beforeTests() { - this.queueChannel = new QueueChannel(1); - this.eventEmittingSkipListener = new EventEmittingSkipListener(this.queueChannel); + this.applicationContext = new SpringApplicationBuilder() + .sources(TestChannelBinderConfiguration + .getCompleteConfiguration(BatchEventsApplication.class)).web(WebApplicationType.NONE).build() + .run(); + StreamBridge streamBridge = this.applicationContext.getBean(StreamBridge.class); + MessagePublisher messagePublisher = new MessagePublisher(streamBridge); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + this.eventEmittingSkipListener = new EventEmittingSkipListener( + messagePublisher, this.taskEventProperties); this.eventEmittingItemProcessListener = new EventEmittingItemProcessListener( - this.queueChannel); + messagePublisher, this.taskEventProperties); this.eventEmittingItemReadListener = new EventEmittingItemReadListener( - this.queueChannel); + messagePublisher, this.taskEventProperties); this.eventEmittingItemWriteListener = new EventEmittingItemWriteListener( - this.queueChannel); + messagePublisher, this.taskEventProperties); this.eventEmittingJobExecutionListener = new EventEmittingJobExecutionListener( - this.queueChannel); + messagePublisher, this.taskEventProperties); this.eventEmittingStepExecutionListener = new EventEmittingStepExecutionListener( - this.queueChannel); + messagePublisher, this.taskEventProperties); this.eventEmittingChunkListener = new EventEmittingChunkListener( - this.queueChannel, 0); + messagePublisher, 0, this.taskEventProperties); + } + + @AfterEach + public void tearDown() { + if (this.applicationContext != null && this.applicationContext.isActive()) { + this.applicationContext.close(); + } } @Test @@ -96,188 +127,171 @@ public class EventListenerTests { @Test public void testItemProcessListenerOnProcessorError() { - RuntimeException exeption = new RuntimeException("Test Exception"); - this.eventEmittingItemProcessListener.onProcessError("HELLO", exeption); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); + this.eventEmittingItemProcessListener.onProcessError("HELLO", + new RuntimeException("Test Exception")); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()) - .isEqualTo("Exception while item was being processed"); + assertThat(getStringFromDestination(this.taskEventProperties.getItemProcessEventBindingName())) + .isEqualTo("\"Exception while item was being processed\""); } @Test public void testItemProcessListenerAfterProcess() { this.eventEmittingItemProcessListener.afterProcess("HELLO_AFTER_PROCESS_EQUAL", "HELLO_AFTER_PROCESS_EQUAL"); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo("item equaled result after processing"); + assertThat(getStringFromDestination(this.taskEventProperties.getItemProcessEventBindingName())) + .isEqualTo("\"item equaled result after processing\""); this.eventEmittingItemProcessListener.afterProcess("HELLO_NOT_EQUAL", "WORLD"); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - msg = this.queueChannel.receive(); - assertThat(msg.getPayload()) - .isEqualTo("item did not equal result after processing"); + assertThat(getStringFromDestination(this.taskEventProperties.getItemProcessEventBindingName())) + .isEqualTo("\"item did not equal result after processing\""); this.eventEmittingItemProcessListener.afterProcess("HELLO_AFTER_PROCESS", null); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo("1 item was filtered"); + assertThat(getStringFromDestination(this.taskEventProperties. + getItemProcessEventBindingName())).isEqualTo("\"1 item was filtered\""); } @Test public void testItemProcessBeforeProcessor() { this.eventEmittingItemProcessListener.beforeProcess("HELLO_BEFORE_PROCESS"); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(0); + assertNoMessageFromDestination(this.taskEventProperties.getItemProcessEventBindingName()); } @Test public void EventEmittingSkipListenerSkipRead() { - RuntimeException exeption = new RuntimeException("Text Exception"); - this.eventEmittingSkipListener.onSkipInRead(exeption); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo("Skipped when reading."); + this.eventEmittingSkipListener.onSkipInRead(new RuntimeException("Text Exception")); + assertThat(getStringFromDestination(this.taskEventProperties. + getSkipEventBindingName())).isEqualTo("\"Skipped when reading.\""); } @Test public void EventEmittingSkipListenerSkipWrite() { - final String MESSAGE = "HELLO_SKIP_WRITE"; - RuntimeException exeption = new RuntimeException("Text Exception"); - this.eventEmittingSkipListener.onSkipInWrite(MESSAGE, exeption); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo(MESSAGE); + final String MESSAGE = "\"HELLO_SKIP_WRITE\""; + this.eventEmittingSkipListener.onSkipInWrite(MESSAGE, + new RuntimeException("Text Exception")); + assertThat(getStringFromDestination(this.taskEventProperties. + getSkipEventBindingName())).isEqualTo(MESSAGE); } @Test public void EventEmittingSkipListenerSkipProcess() { - final String MESSAGE = "HELLO_SKIP_PROCESS"; - RuntimeException exeption = new RuntimeException("Text Exception"); - this.eventEmittingSkipListener.onSkipInProcess(MESSAGE, exeption); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo(MESSAGE); + final String MESSAGE = "\"HELLO_SKIP_PROCESS\""; + this.eventEmittingSkipListener.onSkipInProcess(MESSAGE, + new RuntimeException("Text Exception")); + assertThat(getStringFromDestination(this.taskEventProperties. + getSkipEventBindingName())).isEqualTo(MESSAGE); } @Test public void EventEmittingItemReadListener() { - RuntimeException exeption = new RuntimeException("Text Exception"); - this.eventEmittingItemReadListener.onReadError(exeption); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo("Exception while item was being read"); + this.eventEmittingItemReadListener.onReadError(new RuntimeException("Text Exception")); + assertThat(getStringFromDestination(this.taskEventProperties. + getItemReadEventBindingName())).isEqualTo("\"Exception while item was being read\""); } @Test public void EventEmittingItemReadListenerBeforeRead() { this.eventEmittingItemReadListener.beforeRead(); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(0); + assertNoMessageFromDestination(this.taskEventProperties.getItemReadEventBindingName()); } @Test public void EventEmittingItemReadListenerAfterRead() { this.eventEmittingItemReadListener.afterRead("HELLO_AFTER_READ"); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(0); + assertNoMessageFromDestination(this.taskEventProperties.getItemReadEventBindingName()); } @Test public void EventEmittingItemWriteListenerBeforeWrite() { this.eventEmittingItemWriteListener.beforeWrite(getSampleList()); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo("3 items to be written."); + assertThat(getStringFromDestination(this.taskEventProperties.getItemWriteEventBindingName())) + .isEqualTo("\"3 items to be written.\""); } @Test public void EventEmittingItemWriteListenerAfterWrite() { this.eventEmittingItemWriteListener.afterWrite(getSampleList()); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo("3 items have been written."); + assertThat(getStringFromDestination(this.taskEventProperties.getItemWriteEventBindingName())) + .isEqualTo("\"3 items have been written.\""); } @Test public void EventEmittingItemWriteListenerWriteError() { - RuntimeException exeption = new RuntimeException("Text Exception"); - this.eventEmittingItemWriteListener.onWriteError(exeption, getSampleList()); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()) - .isEqualTo("Exception while 3 items are attempted to be written."); + RuntimeException exception = new RuntimeException("Text Exception"); + this.eventEmittingItemWriteListener.onWriteError(exception, getSampleList()); + + assertThat(getStringFromDestination(this.taskEventProperties.getItemWriteEventBindingName())) + .isEqualTo("\"Exception while 3 items are attempted to be written.\""); } @Test - public void EventEmittingJobExecutionListenerBeforeJob() { + public void EventEmittingJobExecutionListenerBeforeJob() throws IOException { JobExecution jobExecution = getJobExecution(); this.eventEmittingJobExecutionListener.beforeJob(jobExecution); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - JobExecutionEvent jobEvent = (JobExecutionEvent) msg.getPayload(); + List> result = testListener(this.taskEventProperties.getJobExecutionEventBindingName(), 1); + assertThat(result.get(0)).isNotNull(); + + JobExecutionEvent jobEvent = this.objectMapper.readValue(result.get(0).getPayload(), JobExecutionEvent.class); assertThat(jobEvent.getJobInstance().getJobName()) .isEqualTo(jobExecution.getJobInstance().getJobName()); } @Test - public void EventEmittingJobExecutionListenerAfterJob() { + public void EventEmittingJobExecutionListenerAfterJob() throws IOException { JobExecution jobExecution = getJobExecution(); this.eventEmittingJobExecutionListener.afterJob(jobExecution); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - JobExecutionEvent jobEvent = (JobExecutionEvent) msg.getPayload(); + List> result = testListener(this.taskEventProperties.getJobExecutionEventBindingName(), 1); + assertThat(result.get(0)).isNotNull(); + + JobExecutionEvent jobEvent = this.objectMapper.readValue(result.get(0).getPayload(), JobExecutionEvent.class); assertThat(jobEvent.getJobInstance().getJobName()) .isEqualTo(jobExecution.getJobInstance().getJobName()); } @Test - public void EventEmittingStepExecutionListenerBeforeStep() { + public void EventEmittingStepExecutionListenerBeforeStep() throws IOException { final String STEP_MESSAGE = "BEFORE_STEP_MESSAGE"; - JobExecution jobExecution = getJobExecution(); - StepExecution stepExecution = new StepExecution(STEP_MESSAGE, jobExecution); + StepExecution stepExecution = new StepExecution(STEP_MESSAGE, getJobExecution()); this.eventEmittingStepExecutionListener.beforeStep(stepExecution); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - StepExecutionEvent stepExecutionEvent = (StepExecutionEvent) msg.getPayload(); + + List> result = testListener(this.taskEventProperties.getStepExecutionEventBindingName(), 1); + assertThat(result.get(0)).isNotNull(); + + StepExecutionEvent stepExecutionEvent = this.objectMapper.readValue(result.get(0).getPayload(), StepExecutionEvent.class); assertThat(stepExecutionEvent.getStepName()).isEqualTo(STEP_MESSAGE); } @Test - public void EventEmittingStepExecutionListenerAfterStep() { + public void EventEmittingStepExecutionListenerAfterStep() throws IOException { final String STEP_MESSAGE = "AFTER_STEP_MESSAGE"; - JobExecution jobExecution = getJobExecution(); - StepExecution stepExecution = new StepExecution(STEP_MESSAGE, jobExecution); + StepExecution stepExecution = new StepExecution(STEP_MESSAGE, getJobExecution()); this.eventEmittingStepExecutionListener.afterStep(stepExecution); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - StepExecutionEvent stepExecutionEvent = (StepExecutionEvent) msg.getPayload(); + List> result = testListener(this.taskEventProperties.getStepExecutionEventBindingName(), 1); + + assertThat(result.get(0)).isNotNull(); + StepExecutionEvent stepExecutionEvent = this.objectMapper.readValue(result.get(0).getPayload(), StepExecutionEvent.class); assertThat(stepExecutionEvent.getStepName()).isEqualTo(STEP_MESSAGE); } @Test public void EventEmittingChunkExecutionListenerBeforeChunk() { - final String CHUNK_MESSAGE = "Before Chunk Processing"; - ChunkContext chunkContext = getChunkContext(); - this.eventEmittingChunkListener.beforeChunk(chunkContext); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo(CHUNK_MESSAGE); + final String CHUNK_MESSAGE = "\"Before Chunk Processing\""; + this.eventEmittingChunkListener.beforeChunk(getChunkContext()); + assertThat(getStringFromDestination(this.taskEventProperties.getChunkEventBindingName())) + .isEqualTo(CHUNK_MESSAGE); } @Test public void EventEmittingChunkExecutionListenerAfterChunk() { - final String CHUNK_MESSAGE = "After Chunk Processing"; - ChunkContext chunkContext = getChunkContext(); - this.eventEmittingChunkListener.afterChunk(chunkContext); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(1); - Message msg = this.queueChannel.receive(); - assertThat(msg.getPayload()).isEqualTo(CHUNK_MESSAGE); + final String CHUNK_MESSAGE = "\"After Chunk Processing\""; + this.eventEmittingChunkListener.afterChunk(getChunkContext()); + assertThat(getStringFromDestination(this.taskEventProperties.getChunkEventBindingName())) + .isEqualTo(CHUNK_MESSAGE); } @Test public void EventEmittingChunkExecutionListenerAfterChunkError() { - ChunkContext chunkContext = getChunkContext(); - this.eventEmittingChunkListener.afterChunkError(chunkContext); - assertThat(this.queueChannel.getQueueSize()).isEqualTo(0); + this.eventEmittingChunkListener.afterChunkError(getChunkContext()); + assertNoMessageFromDestination(this.taskEventProperties.getChunkEventBindingName()); } private JobExecution getJobExecution() { @@ -303,4 +317,29 @@ public class EventListenerTests { return chunkContext; } + private List> testListener(String bindingName, int numberToRead) { + List> results = new ArrayList<>(); + OutputDestination target = this.applicationContext.getBean(OutputDestination.class); + for (int i = 0; i < numberToRead; i++) { + results.add(target.receive(10000, bindingName)); + } + return results; + } + + private String getStringFromDestination(String bindingName) { + List> result = testListener(bindingName, 1); + assertThat(result.get(0)).isNotNull(); + + assertThat(new String(result.get(0).getPayload())); + return new String(result.get(0).getPayload()); + } + + private void assertNoMessageFromDestination(String bindingName) { + List> result = testListener(bindingName, 1); + assertThat(result.get(0)).isNull(); + } + + @SpringBootApplication + public static class BatchEventsApplication { + } } diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobExecutionEventTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobExecutionEventTests.java index 48933910..8a0d41a8 100644 --- a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobExecutionEventTests.java +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobExecutionEventTests.java @@ -36,15 +36,14 @@ import org.springframework.batch.core.StepExecution; import org.springframework.batch.item.ExecutionContext; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; import org.springframework.boot.test.context.runner.ApplicationContextRunner; -import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration; import org.springframework.cloud.task.batch.listener.support.JobExecutionEvent; import org.springframework.cloud.task.batch.listener.support.JobInstanceEvent; import org.springframework.cloud.task.batch.listener.support.StepExecutionEvent; import org.springframework.cloud.task.configuration.SimpleTaskAutoConfiguration; import org.springframework.cloud.task.configuration.SingleTaskConfiguration; -import org.springframework.context.annotation.Configuration; import org.springframework.core.Ordered; import static org.assertj.core.api.Assertions.assertThat; @@ -64,18 +63,19 @@ public class JobExecutionEventTests { private static final String JOB_CONFIGURATION_NAME = "FOO_JOB_CONFIG"; private static final String[] LISTENER_BEAN_NAMES = { - BatchEventAutoConfiguration.JOB_EXECUTION_EVENTS_LISTENER, - BatchEventAutoConfiguration.STEP_EXECUTION_EVENTS_LISTENER, - BatchEventAutoConfiguration.CHUNK_EVENTS_LISTENER, - BatchEventAutoConfiguration.ITEM_READ_EVENTS_LISTENER, - BatchEventAutoConfiguration.ITEM_WRITE_EVENTS_LISTENER, - BatchEventAutoConfiguration.ITEM_PROCESS_EVENTS_LISTENER, - BatchEventAutoConfiguration.SKIP_EVENTS_LISTENER }; + BatchEventAutoConfiguration.JOB_EXECUTION_EVENTS_LISTENER, + BatchEventAutoConfiguration.STEP_EXECUTION_EVENTS_LISTENER, + BatchEventAutoConfiguration.CHUNK_EVENTS_LISTENER, + BatchEventAutoConfiguration.ITEM_READ_EVENTS_LISTENER, + BatchEventAutoConfiguration.ITEM_WRITE_EVENTS_LISTENER, + BatchEventAutoConfiguration.ITEM_PROCESS_EVENTS_LISTENER, + BatchEventAutoConfiguration.SKIP_EVENTS_LISTENER}; private JobParameters jobParameters; private JobInstance jobInstance; + // @BeforeEach public void setup() { this.jobInstance = new JobInstance(JOB_INSTANCE_ID, JOB_NAME); @@ -86,33 +86,33 @@ public class JobExecutionEventTests { public void testBasic() { JobExecution jobExecution; jobExecution = new JobExecution(this.jobInstance, JOB_EXECUTION_ID, - this.jobParameters, JOB_CONFIGURATION_NAME); + this.jobParameters, JOB_CONFIGURATION_NAME); JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(jobExecution); assertThat(jobExecutionEvent.getJobInstance()) - .as("jobInstance should not be null").isNotNull(); + .as("jobInstance should not be null").isNotNull(); assertThat(jobExecutionEvent.getJobParameters()) - .as("jobParameters should not be null").isNotNull(); + .as("jobParameters should not be null").isNotNull(); assertThat(jobExecutionEvent.getJobConfigurationName()) - .as("jobConfigurationName did not match expected") - .isEqualTo(JOB_CONFIGURATION_NAME); + .as("jobConfigurationName did not match expected") + .isEqualTo(JOB_CONFIGURATION_NAME); assertThat(jobExecutionEvent.getJobParameters().getParameters().size()) - .as("jobParameters size did not match").isEqualTo(0); + .as("jobParameters size did not match").isEqualTo(0); assertThat(jobExecutionEvent.getJobInstance().getJobName()) - .as("jobInstance name did not match").isEqualTo(JOB_NAME); + .as("jobInstance name did not match").isEqualTo(JOB_NAME); assertThat(jobExecutionEvent.getStepExecutions().size()) - .as("no step executions were expected").isEqualTo(0); + .as("no step executions were expected").isEqualTo(0); assertThat(jobExecutionEvent.getExitStatus().getExitCode()) - .as("exitStatus did not match expected").isEqualTo("UNKNOWN"); + .as("exitStatus did not match expected").isEqualTo("UNKNOWN"); } @Test public void testJobParameters() { - String[] JOB_PARAM_KEYS = { "A", "B", "C", "D" }; + String[] JOB_PARAM_KEYS = {"A", "B", "C", "D"}; Date testDate = new Date(); - JobParameter[] PARAMETERS = { new JobParameter("FOO", true), - new JobParameter(1L, true), new JobParameter(1D, true), - new JobParameter(testDate, false) }; + JobParameter[] PARAMETERS = {new JobParameter("FOO", true), + new JobParameter(1L, true), new JobParameter(1D, true), + new JobParameter(testDate, false)}; Map jobParamMap = new LinkedHashMap<>(); for (int paramCount = 0; paramCount < JOB_PARAM_KEYS.length; paramCount++) { @@ -121,33 +121,33 @@ public class JobExecutionEventTests { this.jobParameters = new JobParameters(jobParamMap); JobExecution jobExecution; jobExecution = new JobExecution(this.jobInstance, JOB_EXECUTION_ID, - this.jobParameters, JOB_CONFIGURATION_NAME); + this.jobParameters, JOB_CONFIGURATION_NAME); JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(jobExecution); assertThat(jobExecutionEvent.getJobParameters().getString("A")) - .as("Job Parameter A was expected").isNotNull(); + .as("Job Parameter A was expected").isNotNull(); assertThat(jobExecutionEvent.getJobParameters().getLong("B")) - .as("Job Parameter B was expected").isNotNull(); + .as("Job Parameter B was expected").isNotNull(); assertThat(jobExecutionEvent.getJobParameters().getDouble("C")) - .as("Job Parameter C was expected").isNotNull(); + .as("Job Parameter C was expected").isNotNull(); assertThat(jobExecutionEvent.getJobParameters().getDate("D")) - .as("Job Parameter D was expected").isNotNull(); + .as("Job Parameter D was expected").isNotNull(); assertThat(jobExecutionEvent.getJobParameters().getString("A")) - .as("Job Parameter A value was not correct").isEqualTo("FOO"); + .as("Job Parameter A value was not correct").isEqualTo("FOO"); assertThat(jobExecutionEvent.getJobParameters().getLong("B")) - .as("Job Parameter B value was not correct").isEqualTo(new Long(1)); + .as("Job Parameter B value was not correct").isEqualTo(new Long(1)); assertThat(jobExecutionEvent.getJobParameters().getDouble("C")) - .as("Job Parameter C value was not correct").isEqualTo(new Double(1)); + .as("Job Parameter C value was not correct").isEqualTo(new Double(1)); assertThat(jobExecutionEvent.getJobParameters().getDate("D")) - .as("Job Parameter D value was not correct").isEqualTo(testDate); + .as("Job Parameter D value was not correct").isEqualTo(testDate); } @Test public void testStepExecutions() { JobExecution jobExecution; jobExecution = new JobExecution(this.jobInstance, JOB_EXECUTION_ID, - this.jobParameters, JOB_CONFIGURATION_NAME); + this.jobParameters, JOB_CONFIGURATION_NAME); List stepsExecutions = new ArrayList<>(); stepsExecutions.add(new StepExecution("foo", jobExecution)); stepsExecutions.add(new StepExecution("bar", jobExecution)); @@ -156,15 +156,15 @@ public class JobExecutionEventTests { JobExecutionEvent jobExecutionsEvent = new JobExecutionEvent(jobExecution); assertThat(jobExecutionsEvent.getStepExecutions().size()) - .as("stepExecutions count is incorrect").isEqualTo(3); + .as("stepExecutions count is incorrect").isEqualTo(3); Iterator iter = jobExecutionsEvent.getStepExecutions() - .iterator(); + .iterator(); assertThat(iter.next().getStepName()).as("foo stepExecution is not present") - .isEqualTo("foo"); + .isEqualTo("foo"); assertThat(iter.next().getStepName()).as("bar stepExecution is not present") - .isEqualTo("bar"); + .isEqualTo("bar"); assertThat(iter.next().getStepName()).as("baz stepExecution is not present") - .isEqualTo("baz"); + .isEqualTo("baz"); } @Test @@ -172,6 +172,7 @@ public class JobExecutionEventTests { testDisabledConfiguration(null, null); } + @Test public void testDisabledJobExecutionListener() { testDisabledConfiguration("spring.cloud.task.batch.events.job-execution.enabled", @@ -181,9 +182,10 @@ public class JobExecutionEventTests { @Test public void testDisabledStepExecutionListener() { testDisabledConfiguration("spring.cloud.task.batch.events.step-execution.enabled", - BatchEventAutoConfiguration.STEP_EXECUTION_EVENTS_LISTENER); + BatchEventAutoConfiguration.STEP_EXECUTION_EVENTS_LISTENER); } + @Test public void testDisabledChunkListener() { testDisabledConfiguration("spring.cloud.task.batch.events.chunk.enabled", @@ -315,12 +317,11 @@ public class JobExecutionEventTests { public void testOrderConfiguration() { ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner() .withConfiguration(AutoConfigurations.of( - EventJobExecutionConfiguration.class, PropertyPlaceholderAutoConfiguration.class, - TestSupportBinderAutoConfiguration.class, SimpleTaskAutoConfiguration.class, SingleTaskConfiguration.class)) .withUserConfiguration( - BatchEventAutoConfiguration.JobExecutionListenerConfiguration.class) + BatchEventAutoConfiguration.JobExecutionListenerConfiguration.class, + BatchEventTestApplication.class) .withPropertyValues("--spring.cloud.task.closecontext_enabled=false", "--spring.main.web-environment=false", "--spring.cloud.task.batch.events.chunk-order=5", @@ -344,12 +345,11 @@ public class JobExecutionEventTests { public void singleStepBatchJobSkip() { ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner() .withConfiguration(AutoConfigurations.of( - EventJobExecutionConfiguration.class, PropertyPlaceholderAutoConfiguration.class, - TestSupportBinderAutoConfiguration.class, SimpleTaskAutoConfiguration.class, SingleTaskConfiguration.class)) .withUserConfiguration( - BatchEventAutoConfiguration.JobExecutionListenerConfiguration.class) + BatchEventAutoConfiguration.JobExecutionListenerConfiguration.class, + BatchEventTestApplication.class) .withPropertyValues("--spring.cloud.task.closecontext_enabled=false", "--spring.main.web-environment=false", "spring.batch.job.jobName=FOO"); applicationContextRunner.run((context) -> { @@ -364,15 +364,14 @@ public class JobExecutionEventTests { private void testDisabledConfiguration(String property, String disabledListener) { String disabledPropertyArg = (property != null) ? "--" + property + "=false" : ""; ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of( - EventJobExecutionConfiguration.class, - PropertyPlaceholderAutoConfiguration.class, - TestSupportBinderAutoConfiguration.class, - SimpleTaskAutoConfiguration.class, SingleTaskConfiguration.class)) - .withUserConfiguration( - BatchEventAutoConfiguration.JobExecutionListenerConfiguration.class) - .withPropertyValues("--spring.cloud.task.closecontext_enabled=false", - "--spring.main.web-environment=false", disabledPropertyArg); + .withConfiguration(AutoConfigurations.of( + PropertyPlaceholderAutoConfiguration.class, + SimpleTaskAutoConfiguration.class, SingleTaskConfiguration.class)) + .withUserConfiguration( + BatchEventAutoConfiguration.JobExecutionListenerConfiguration.class, + BatchEventTestApplication.class) + .withPropertyValues("--spring.cloud.task.closecontext_enabled=false", + "--spring.main.web-environment=false", disabledPropertyArg); applicationContextRunner.run((context) -> { boolean exceptionThrown = false; for (String beanName : LISTENER_BEAN_NAMES) { @@ -385,19 +384,16 @@ public class JobExecutionEventTests { } assertThat(exceptionThrown).as( String.format("Did not expect %s bean in context", beanName)) - .isTrue(); + .isTrue(); } else { context.getBean(beanName); } } }); - } - @Configuration(proxyBeanMethods = false) - public static class EventJobExecutionConfiguration { - + @SpringBootApplication + public static class BatchEventTestApplication { } - } diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobParameterEventTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobParameterEventTests.java index 8044b96a..2e3797fb 100644 --- a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobParameterEventTests.java +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobParameterEventTests.java @@ -23,8 +23,6 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.core.JobParameter; import org.springframework.cloud.task.batch.listener.support.JobParameterEvent; -import static junit.framework.TestCase.assertFalse; -import static junit.framework.TestCase.assertTrue; import static org.assertj.core.api.Assertions.assertThat; /** @@ -37,7 +35,7 @@ public class JobParameterEventTests { JobParameterEvent jobParameterEvent = new JobParameterEvent(); assertThat(jobParameterEvent.getValue()).isNull(); assertThat(jobParameterEvent.getType()).isNull(); - assertFalse(jobParameterEvent.isIdentifying()); + assertThat(jobParameterEvent.isIdentifying()).isFalse(); assertThat(jobParameterEvent).isEqualTo(new JobParameterEvent()); } @@ -50,15 +48,15 @@ public class JobParameterEventTests { assertThat(jobParameterEvent.getValue()).isEqualTo(EXPECTED_VALUE); assertThat(jobParameterEvent.getType()) .isEqualTo(JobParameterEvent.ParameterType.STRING); - assertTrue(jobParameterEvent.isIdentifying()); + assertThat(jobParameterEvent.isIdentifying()).isTrue(); jobParameter = new JobParameter(EXPECTED_DATE_VALUE, true); jobParameterEvent = new JobParameterEvent(jobParameter); assertThat(jobParameterEvent.getValue()).isEqualTo(EXPECTED_DATE_VALUE); assertThat(jobParameterEvent.getType()) .isEqualTo(JobParameterEvent.ParameterType.DATE); - assertTrue(jobParameterEvent.isIdentifying()); - assertTrue(new JobParameterEvent(jobParameter).equals(jobParameterEvent)); + assertThat(jobParameterEvent.isIdentifying()).isTrue(); + assertThat(new JobParameterEvent(jobParameter).equals(jobParameterEvent)).isTrue(); } @Test @@ -68,9 +66,9 @@ public class JobParameterEventTests { JobParameterEvent jobParameterEvent = new JobParameterEvent(jobParameter); JobParameterEvent anotherJobParameterEvent = new JobParameterEvent(jobParameter); - assertTrue(jobParameterEvent.equals(jobParameterEvent)); - assertFalse(jobParameterEvent.equals("nope")); - assertTrue(jobParameterEvent.equals(anotherJobParameterEvent)); + assertThat(jobParameterEvent.equals(jobParameterEvent)).isTrue(); + assertThat(jobParameterEvent.equals("nope")).isFalse(); + assertThat(jobParameterEvent.equals(anotherJobParameterEvent)).isTrue(); } @Test diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobParametersEventTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobParametersEventTests.java index df0f9f75..5b00fcac 100644 --- a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobParametersEventTests.java +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/batch/listener/JobParametersEventTests.java @@ -26,8 +26,6 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.core.JobParameter; import org.springframework.cloud.task.batch.listener.support.JobParametersEvent; -import static junit.framework.TestCase.assertFalse; -import static junit.framework.TestCase.assertTrue; import static org.assertj.core.api.Assertions.assertThat; /** @@ -55,7 +53,7 @@ public class JobParametersEventTests { public void testDefaultConstructor() { JobParametersEvent jobParametersEvent = new JobParametersEvent(); assertThat(jobParametersEvent.getParameters().size()).isEqualTo(0); - assertTrue(jobParametersEvent.isEmpty()); + assertThat(jobParametersEvent.isEmpty()).isTrue(); } @Test @@ -74,10 +72,10 @@ public class JobParametersEventTests { @Test public void testEquals() { - assertTrue(getPopulatedParametersEvent().equals(getPopulatedParametersEvent())); + assertThat(getPopulatedParametersEvent().equals(getPopulatedParametersEvent())).isTrue(); JobParametersEvent jobParametersEvent = getPopulatedParametersEvent(); - assertFalse(jobParametersEvent.equals("FOO")); - assertTrue(jobParametersEvent.equals(jobParametersEvent)); + assertThat(jobParametersEvent.equals("FOO")).isFalse(); + assertThat(jobParametersEvent.equals(jobParametersEvent)).isTrue(); } @Test diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLaunchConfigurationExistingTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLaunchConfigurationExistingTests.java index 4baa6e8a..210d709b 100644 --- a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLaunchConfigurationExistingTests.java +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLaunchConfigurationExistingTests.java @@ -19,14 +19,13 @@ package org.springframework.cloud.task.launcher; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.cloud.deployer.spi.local.LocalDeployerProperties; import org.springframework.cloud.deployer.spi.local.LocalTaskLauncher; import org.springframework.cloud.deployer.spi.task.TaskLauncher; -import org.springframework.context.ApplicationContext; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.test.context.junit.jupiter.SpringExtension; import static org.assertj.core.api.Assertions.assertThat; @@ -37,25 +36,23 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Glenn Renfro */ @ExtendWith(SpringExtension.class) -@SpringBootTest(classes = { - TaskLaunchConfigurationExistingTests.TestTaskDeployerConfiguration.class }) public class TaskLaunchConfigurationExistingTests { private static TaskLauncher testTaskLauncher; - @Autowired - private ApplicationContext context; - @Test public void testTaskLauncher() { - LocalTaskLauncher taskLauncher = this.context.getBean(LocalTaskLauncher.class); - assertThat(testTaskLauncher).isNotNull(); - assertThat(taskLauncher).isNotNull(); - assertThat(taskLauncher).isEqualTo(testTaskLauncher); + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TaskLaunchConfigurationExistingTests.TestTaskDeployerConfiguration.class).web(WebApplicationType.NONE).run( + "--spring.jmx.enabled=false")) { + LocalTaskLauncher taskLauncher = context.getBean(LocalTaskLauncher.class); + assertThat(testTaskLauncher).isNotNull(); + assertThat(taskLauncher).isNotNull(); + assertThat(taskLauncher).isEqualTo(testTaskLauncher); + } } - @Configuration(proxyBeanMethods = false) - protected static class TestTaskDeployerConfiguration { + private static class TestTaskDeployerConfiguration { @Bean public TaskLauncher taskLauncher() { diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherFunctionTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherFunctionTests.java new file mode 100644 index 00000000..8575ed94 --- /dev/null +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherFunctionTests.java @@ -0,0 +1,213 @@ +/* + * Copyright 2021-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.task.launcher; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.deployer.spi.task.LaunchState; +import org.springframework.cloud.stream.binder.test.InputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; +import org.springframework.cloud.task.launcher.configuration.TaskConfiguration; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Import; +import org.springframework.messaging.support.GenericMessage; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TaskLauncherFunctionTests { + + private final static String TASK_NAME_PREFIX = "Task-"; + + private final static String APP_NAME = "MY_APP_NAME"; + + private final static String PARAM1 = "FOO"; + + private final static String PARAM2 = "BAR"; + + private final static String VALID_URL = "maven://org.springframework.cloud.task.app:" + + "timestamp-task:jar:1.0.1.RELEASE"; + + private final static String INVALID_URL = "maven://not.real.group:" + + "invalid:jar:1.0.0.BUILD-SNAPSHOT"; + + private final static String DEFAULT_STATUS = "test_status"; + + private Map properties; + + @Test + public void testProcessorFromFunction() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + TaskLauncherSinkTestApplication.class)).web(WebApplicationType.NONE).run( + "--spring.jmx.enabled=false")) { + + InputDestination source = context.getBean(InputDestination.class); + TaskLaunchRequest request = new TaskLaunchRequest(VALID_URL, Collections.emptyList(), + Collections.emptyMap(), null, "TESTAPP1"); + GenericMessage message = new GenericMessage<>(request); + source.send(message); + TaskConfiguration.TestTaskLauncher target = context.getBean(TaskConfiguration.TestTaskLauncher.class); + assertThat(target.status(DEFAULT_STATUS).getState()) + .isEqualTo(LaunchState.complete); + } + } + + @Test + public void testSuccessWithParams() throws Exception { + List commandLineArgs = new ArrayList<>(); + commandLineArgs.add(PARAM1); + commandLineArgs.add(PARAM2); + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + TaskLauncherSinkTestApplication.class)).web(WebApplicationType.NONE).run( + "--spring.jmx.enabled=false")) { + TaskConfiguration.TestTaskLauncher testTaskLauncher = launchTaskString(VALID_URL, + commandLineArgs, null, context); + verifySuccessWithParams(testTaskLauncher); + + testTaskLauncher = launchTaskByteArray(VALID_URL, commandLineArgs, null, context); + verifySuccessWithParams(testTaskLauncher); + + testTaskLauncher = launchTaskTaskLaunchRequest(VALID_URL, commandLineArgs, null, context); + verifySuccessWithParams(testTaskLauncher); + } + } + + @Test + public void testSuccessWithAppName() throws Exception { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + TaskLauncherSinkTestApplication.class)).web(WebApplicationType.NONE).run( + "--spring.jmx.enabled=false")) { + TaskConfiguration.TestTaskLauncher testTaskLauncher = launchTaskString(VALID_URL, + null, APP_NAME, context); + verifySuccessWithAppName(testTaskLauncher); + + testTaskLauncher = launchTaskByteArray(VALID_URL, null, APP_NAME, context); + verifySuccessWithAppName(testTaskLauncher); + + testTaskLauncher = launchTaskTaskLaunchRequest(VALID_URL, null, APP_NAME, context); + verifySuccessWithAppName(testTaskLauncher); + } + } + + @Test + public void testInvalidJar() throws Exception { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + TaskLauncherSinkTestApplication.class)).web(WebApplicationType.NONE).run( + "--spring.jmx.enabled=false")) { + TaskConfiguration.TestTaskLauncher testTaskLauncher = launchTaskTaskLaunchRequest( + INVALID_URL, null, APP_NAME, context); + verifySuccessWithAppName(testTaskLauncher); + } + } + + @Test + public void testNoRun() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + TaskLauncherSinkTestApplication.class)).web(WebApplicationType.NONE).run( + "--spring.jmx.enabled=false")) { + TaskConfiguration.TestTaskLauncher testTaskLauncher = context + .getBean(TaskConfiguration.TestTaskLauncher.class); + assertThat(testTaskLauncher.status(DEFAULT_STATUS).getState()) + .isEqualTo(LaunchState.unknown); + } + } + + private void verifySuccessWithAppName( + TaskConfiguration.TestTaskLauncher testTaskLauncher) { + assertThat(testTaskLauncher.status(DEFAULT_STATUS).getState()) + .isEqualTo(LaunchState.complete); + assertThat(testTaskLauncher.getCommandlineArguments().size()).isEqualTo(0); + assertThat(testTaskLauncher.getApplicationName()).isEqualTo(APP_NAME); + } + + private String getStringTaskLaunchRequest(String artifactURL, + List commandLineArgs, String applicationName) throws Exception { + TaskLaunchRequest request = new TaskLaunchRequest(artifactURL, commandLineArgs, + this.properties, null, applicationName); + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(request); + } + + private void verifySuccessWithParams( + TaskConfiguration.TestTaskLauncher testTaskLauncher) { + assertThat(testTaskLauncher.status(DEFAULT_STATUS).getState()) + .isEqualTo(LaunchState.complete); + assertThat(testTaskLauncher.getCommandlineArguments().size()).isEqualTo(2); + assertThat(testTaskLauncher.getCommandlineArguments().get(0)).isEqualTo(PARAM1); + assertThat(testTaskLauncher.getCommandlineArguments().get(1)).isEqualTo(PARAM2); + assertThat(testTaskLauncher.getApplicationName().startsWith(TASK_NAME_PREFIX)) + .isTrue(); + } + + private TaskConfiguration.TestTaskLauncher launchTaskString(String artifactURL, + List commandLineArgs, String applicationName, + ConfigurableApplicationContext context) throws Exception { + TaskConfiguration.TestTaskLauncher testTaskLauncher = context + .getBean(TaskConfiguration.TestTaskLauncher.class); + String stringRequest = getStringTaskLaunchRequest(artifactURL, commandLineArgs, + applicationName); + GenericMessage message = new GenericMessage<>(stringRequest); + InputDestination source = context.getBean(InputDestination.class); + source.send(message); + return testTaskLauncher; + } + + private TaskConfiguration.TestTaskLauncher launchTaskByteArray(String artifactURL, + List commandLineArgs, String applicationName, + ConfigurableApplicationContext context) throws Exception { + TaskConfiguration.TestTaskLauncher testTaskLauncher = context + .getBean(TaskConfiguration.TestTaskLauncher.class); + String stringRequest = getStringTaskLaunchRequest(artifactURL, commandLineArgs, + applicationName); + GenericMessage message = new GenericMessage<>(stringRequest.getBytes()); + InputDestination source = context.getBean(InputDestination.class); + source.send(message); + return testTaskLauncher; + } + + private TaskConfiguration.TestTaskLauncher launchTaskTaskLaunchRequest( + String artifactURL, List commandLineArgs, String applicationName, + ConfigurableApplicationContext context) + throws Exception { + TaskConfiguration.TestTaskLauncher testTaskLauncher = context + .getBean(TaskConfiguration.TestTaskLauncher.class); + TaskLaunchRequest request = new TaskLaunchRequest(artifactURL, commandLineArgs, + this.properties, null, applicationName); + GenericMessage message = new GenericMessage<>(request); + InputDestination source = context.getBean(InputDestination.class); + source.send(message); + return testTaskLauncher; + } + + @SpringBootApplication + @Import({TaskLauncherSink.class}) + public static class TaskLauncherSinkTestApplication { + } +} diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java deleted file mode 100644 index 3f2b28c5..00000000 --- a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/launcher/TaskLauncherSinkTests.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright 2016-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.task.launcher; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.deployer.spi.task.LaunchState; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.cloud.task.launcher.app.TaskLauncherSinkApplication; -import org.springframework.cloud.task.launcher.configuration.TaskConfiguration; -import org.springframework.context.ApplicationContext; -import org.springframework.messaging.support.GenericMessage; -import org.springframework.test.context.junit.jupiter.SpringExtension; - -import static org.assertj.core.api.Assertions.assertThat; - -@ExtendWith(SpringExtension.class) -@SpringBootTest(classes = { TaskLauncherSinkApplication.class, TaskConfiguration.class }) -public class TaskLauncherSinkTests { - - private final static String TASK_NAME_PREFIX = "Task-"; - - private final static String APP_NAME = "MY_APP_NAME"; - - private final static String PARAM1 = "FOO"; - - private final static String PARAM2 = "BAR"; - - private final static String VALID_URL = "maven://org.springframework.cloud.task.app:" - + "timestamp-task:jar:1.0.1.RELEASE"; - - private final static String INVALID_URL = "maven://not.real.group:" - + "invalid:jar:1.0.0.BUILD-SNAPSHOT"; - - private final static String DEFAULT_STATUS = "test_status"; - - private Map properties; - - @Autowired - private ApplicationContext context; - - @Autowired - private Sink sink; - - @BeforeEach - public void setup() { - this.properties = new HashMap<>(); - this.properties.put("server.port", "0"); - } - - @Test - public void testSuccessWithParams() throws Exception { - List commandLineArgs = new ArrayList<>(); - commandLineArgs.add(PARAM1); - commandLineArgs.add(PARAM2); - - TaskConfiguration.TestTaskLauncher testTaskLauncher = launchTaskString(VALID_URL, - commandLineArgs, null); - verifySuccessWithParams(testTaskLauncher); - - testTaskLauncher = launchTaskByteArray(VALID_URL, commandLineArgs, null); - verifySuccessWithParams(testTaskLauncher); - - testTaskLauncher = launchTaskTaskLaunchRequest(VALID_URL, commandLineArgs, null); - verifySuccessWithParams(testTaskLauncher); - } - - private void verifySuccessWithParams( - TaskConfiguration.TestTaskLauncher testTaskLauncher) { - assertThat(testTaskLauncher.status(DEFAULT_STATUS).getState()) - .isEqualTo(LaunchState.complete); - assertThat(testTaskLauncher.getCommandlineArguments().size()).isEqualTo(2); - assertThat(testTaskLauncher.getCommandlineArguments().get(0)).isEqualTo(PARAM1); - assertThat(testTaskLauncher.getCommandlineArguments().get(1)).isEqualTo(PARAM2); - assertThat(testTaskLauncher.getApplicationName().startsWith(TASK_NAME_PREFIX)) - .isTrue(); - } - - @Test - public void testSuccessWithAppName() throws Exception { - TaskConfiguration.TestTaskLauncher testTaskLauncher = launchTaskString(VALID_URL, - null, APP_NAME); - verifySuccessWithAppName(testTaskLauncher); - - testTaskLauncher = launchTaskByteArray(VALID_URL, null, APP_NAME); - verifySuccessWithAppName(testTaskLauncher); - - testTaskLauncher = launchTaskTaskLaunchRequest(VALID_URL, null, APP_NAME); - verifySuccessWithAppName(testTaskLauncher); - } - - @Test - public void testInvalidJar() throws Exception { - TaskConfiguration.TestTaskLauncher testTaskLauncher = launchTaskTaskLaunchRequest( - INVALID_URL, null, APP_NAME); - verifySuccessWithAppName(testTaskLauncher); - } - - private void verifySuccessWithAppName( - TaskConfiguration.TestTaskLauncher testTaskLauncher) { - assertThat(testTaskLauncher.status(DEFAULT_STATUS).getState()) - .isEqualTo(LaunchState.complete); - assertThat(testTaskLauncher.getCommandlineArguments().size()).isEqualTo(0); - assertThat(testTaskLauncher.getApplicationName()).isEqualTo(APP_NAME); - } - - @Test - public void testSuccessNoParams() throws Exception { - TaskConfiguration.TestTaskLauncher testTaskLauncher = launchTaskString(VALID_URL, - null, null); - verifySuccessWithNoParams(testTaskLauncher); - - testTaskLauncher = launchTaskByteArray(VALID_URL, null, null); - verifySuccessWithNoParams(testTaskLauncher); - - testTaskLauncher = launchTaskTaskLaunchRequest(VALID_URL, null, null); - verifySuccessWithNoParams(testTaskLauncher); - } - - private void verifySuccessWithNoParams( - TaskConfiguration.TestTaskLauncher testTaskLauncher) { - assertThat(testTaskLauncher.status(DEFAULT_STATUS).getState()) - .isEqualTo(LaunchState.complete); - assertThat(testTaskLauncher.getCommandlineArguments().size()).isEqualTo(0); - assertThat(testTaskLauncher.getApplicationName().startsWith(TASK_NAME_PREFIX)) - .isTrue(); - } - - @Test - public void testNoRun() { - TaskConfiguration.TestTaskLauncher testTaskLauncher = this.context - .getBean(TaskConfiguration.TestTaskLauncher.class); - assertThat(testTaskLauncher.status(DEFAULT_STATUS).getState()) - .isEqualTo(LaunchState.unknown); - } - - private TaskConfiguration.TestTaskLauncher launchTaskString(String artifactURL, - List commandLineArgs, String applicationName) throws Exception { - TaskConfiguration.TestTaskLauncher testTaskLauncher = this.context - .getBean(TaskConfiguration.TestTaskLauncher.class); - String stringRequest = getStringTaskLaunchRequest(artifactURL, commandLineArgs, - applicationName); - GenericMessage message = new GenericMessage<>(stringRequest); - - this.sink.input().send(message); - return testTaskLauncher; - } - - private TaskConfiguration.TestTaskLauncher launchTaskByteArray(String artifactURL, - List commandLineArgs, String applicationName) throws Exception { - TaskConfiguration.TestTaskLauncher testTaskLauncher = this.context - .getBean(TaskConfiguration.TestTaskLauncher.class); - String stringRequest = getStringTaskLaunchRequest(artifactURL, commandLineArgs, - applicationName); - GenericMessage message = new GenericMessage<>(stringRequest.getBytes()); - - this.sink.input().send(message); - return testTaskLauncher; - } - - private String getStringTaskLaunchRequest(String artifactURL, - List commandLineArgs, String applicationName) throws Exception { - TaskLaunchRequest request = new TaskLaunchRequest(artifactURL, commandLineArgs, - this.properties, null, applicationName); - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(request); - } - - private TaskConfiguration.TestTaskLauncher launchTaskTaskLaunchRequest( - String artifactURL, List commandLineArgs, String applicationName) - throws Exception { - TaskConfiguration.TestTaskLauncher testTaskLauncher = this.context - .getBean(TaskConfiguration.TestTaskLauncher.class); - TaskLaunchRequest request = new TaskLaunchRequest(artifactURL, commandLineArgs, - this.properties, null, applicationName); - GenericMessage message = new GenericMessage<>(request); - - this.sink.input().send(message); - return testTaskLauncher; - } - -} diff --git a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java index 6133df9f..bcbd180e 100644 --- a/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java +++ b/spring-cloud-task-stream/src/test/java/org/springframework/cloud/task/listener/TaskEventTests.java @@ -18,19 +18,12 @@ package org.springframework.cloud.task.listener; import org.junit.jupiter.api.Test; -import org.springframework.boot.autoconfigure.AutoConfigurations; -import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; -import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration; -import org.springframework.boot.test.context.runner.ApplicationContextRunner; -import org.springframework.cloud.stream.config.BindingServiceConfiguration; -import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; import org.springframework.cloud.task.configuration.EnableTask; -import org.springframework.cloud.task.configuration.SimpleTaskAutoConfiguration; -import org.springframework.cloud.task.configuration.SingleTaskConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.BridgeFrom; -import org.springframework.integration.channel.NullChannel; +import org.springframework.context.ConfigurableApplicationContext; import static org.assertj.core.api.Assertions.assertThat; @@ -43,35 +36,16 @@ public class TaskEventTests { @Test public void testDefaultConfiguration() { - ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of( - EmbeddedDataSourceConfiguration.class, - TaskEventAutoConfiguration.class, - PropertyPlaceholderAutoConfiguration.class, - TestSupportBinderAutoConfiguration.class, - SimpleTaskAutoConfiguration.class, SingleTaskConfiguration.class, - BindingServiceConfiguration.class)) - .withUserConfiguration(TaskEventsConfiguration.class) - .withPropertyValues("spring.cloud.task.closecontext_enabled=false", - "spring.main.web-environment=false"); - applicationContextRunner.run((context) -> { - assertThat(context.getBean("taskEventListener")).isNotNull(); - assertThat( - context.getBean(TaskEventAutoConfiguration.TaskEventChannels.class)) - .isNotNull(); - }); + ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder() + .sources(TestChannelBinderConfiguration + .getCompleteConfiguration(TaskEventsApplication.class)).web(WebApplicationType.NONE).build() + .run(); + assertThat(applicationContext.getBean("taskEventEmitter")).isNotNull(); } @EnableTask - @Configuration - public static class TaskEventsConfiguration { - - @Bean - @BridgeFrom(TaskEventAutoConfiguration.TaskEventChannels.TASK_EVENTS) - public NullChannel testEmptyChannel() { - return new NullChannel(); - } - + @SpringBootApplication + public static class TaskEventsApplication { } } diff --git a/spring-cloud-task-stream/src/test/resources/application.properties b/spring-cloud-task-stream/src/test/resources/application.properties index fce9ea93..00470d5f 100644 --- a/spring-cloud-task-stream/src/test/resources/application.properties +++ b/spring-cloud-task-stream/src/test/resources/application.properties @@ -1,2 +1,6 @@ maven.remoteRepositories.springRepo.url=https://repo.spring.io/libs-snapshot logging.level.org.springframework.cloud.task.batch.listener=DEBUG +spring.cloud.stream.function.definition=taskLauncherSink +spring.cloud.stream.function.bindings.taskLauncherSink-in-0=input +spring.cloud.stream.bindings.input.group=taskLauncherSink +spring.cloud.stream.bindings.input.destination=taskLauncherSinkExchange