diff --git a/stream-applications-integration-tests/pom.xml b/stream-applications-integration-tests/pom.xml
index b0921df..e9b4d5c 100644
--- a/stream-applications-integration-tests/pom.xml
+++ b/stream-applications-integration-tests/pom.xml
@@ -42,7 +42,7 @@
2.27.1
1.11.415
8.0.16
- 1.6.2
+ 5.6.2
@@ -68,17 +68,18 @@
test
-
- org.junit.platform
- junit-platform-testkit
- ${junit-platform-runner.version}
- test
-
org.testcontainers
junit-jupiter
test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit-jupiter-api.version}
+ test
+
org.testcontainers
kafka
@@ -154,12 +155,6 @@
spring-boot-starter-jdbc
test
-
- org.junit.platform
- junit-platform-runner
- ${junit-platform-runner.version}
- test
-
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/KafkaHttpRequestProcessorTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/KafkaHttpRequestProcessorTests.java
index 63b8068..f2b06e2 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/KafkaHttpRequestProcessorTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/KafkaHttpRequestProcessorTests.java
@@ -19,8 +19,8 @@ package org.springframework.cloud.stream.apps.integration.test.processor.httpreq
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/RabbitMQHttpRequestProcessorTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/RabbitMQHttpRequestProcessorTests.java
index fe394c2..3c32ed1 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/RabbitMQHttpRequestProcessorTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/RabbitMQHttpRequestProcessorTests.java
@@ -19,8 +19,8 @@ package org.springframework.cloud.stream.apps.integration.test.processor.httpreq
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/JdbcSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/JdbcSinkTests.java
index 2d96162..8737503 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/JdbcSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/JdbcSinkTests.java
@@ -18,16 +18,19 @@ package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -36,13 +39,14 @@ import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
+@ExtendWith(BaseContainerExtension.class)
public abstract class JdbcSinkTests {
private static JdbcTemplate jdbcTemplate;
private static StreamAppContainer sink;
- private static LogMatcher logMatcher = LogMatcher.contains("Started JdbcSink");
+ //private static LogMatcher startupLogEntry = LogMatcher.contains("Started JdbcSink");
@Autowired
private TestTopicSender testTopicSender;
@@ -58,21 +62,19 @@ public abstract class JdbcSinkTests {
.withLogConsumer(appLog("mysql-for-sink"))
.withCommand("--init-file", "/init.sql");
- protected static void configureSink(StreamAppContainer baseContainer) {
- sink = baseContainer
+ @BeforeAll
+ static void init() {
+ // BaseContainerExtension.logMatcher().map(lm -> startupLogEntry = lm);
+ sink = BaseContainerExtension.containerInstance()
.dependsOn(mySQL)
.withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
.withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
- // .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
- // .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_JDBC", "DEBUG")
- // .withEnv("LOGGING_LEVEL_ORG_MARIADB_JDBC", "DEBUG")
- // .log()
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mariadb://mysql-for-sink:3306/test")
- .withLogConsumer(logMatcher);
+ .waitingFor(Wait.forLogMessage(".*Started JdbcSink.*", 1));
startSink();
}
@@ -89,7 +91,11 @@ public abstract class JdbcSinkTests {
.until(() -> jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)
.intValue() == 0);
sink.start();
- await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
+ // await().atMost(DEFAULT_DURATION).conditionEvaluationListener(evaluatedCondition -> {
+ // System.out.println("App started ....................................... in "
+ // + evaluatedCondition.getElapsedTimeInMS());
+ // }).until(startupLogEntry.matches());
+
}
@Test
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/KafkaJdbcSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/KafkaJdbcSinkTests.java
index 1f89f41..e955239 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/KafkaJdbcSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/KafkaJdbcSinkTests.java
@@ -16,19 +16,16 @@
package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
-import org.junit.jupiter.api.BeforeAll;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
-class KafkaJdbcSinkTests extends JdbcSinkTests {
+public class KafkaJdbcSinkTests extends JdbcSinkTests {
- @BeforeAll
- static void init() {
- configureSink(KafkaConfig
- .prepackagedContainerFor("jdbc-sink", VERSION));
- }
+ @BaseContainer
+ public static StreamAppContainer sink = KafkaConfig.prepackagedContainerFor("jdbc-sink", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/RabbitMQJdbcSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/RabbitMQJdbcSinkTests.java
index d370f9f..be9ebb5 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/RabbitMQJdbcSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/RabbitMQJdbcSinkTests.java
@@ -16,19 +16,17 @@
package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
-import org.junit.jupiter.api.BeforeAll;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
-class RabbitMQJdbcSinkTests extends JdbcSinkTests {
+public class RabbitMQJdbcSinkTests extends JdbcSinkTests {
+
+ @BaseContainer
+ public static StreamAppContainer sink = RabbitMQConfig.prepackagedContainerFor("jdbc-sink", VERSION);
- @BeforeAll
- static void init() {
- configureSink(RabbitMQConfig
- .prepackagedContainerFor("jdbc-sink", VERSION));
- }
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/KafkaMongoDBSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/KafkaMongoDBSinkTests.java
index 17c3c2b..508d8d1 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/KafkaMongoDBSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/KafkaMongoDBSinkTests.java
@@ -16,19 +16,16 @@
package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
-import org.junit.jupiter.api.BeforeAll;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
-class KafkaMongoDBSinkTests extends MongoDBSinkTests {
- @BeforeAll
- static void init() {
- configureSink(KafkaConfig
- .prepackagedContainerFor("mongodb-sink", VERSION));
- }
+public class KafkaMongoDBSinkTests extends MongoDBSinkTests {
+ @BaseContainer
+ public static StreamAppContainer sink = KafkaConfig.prepackagedContainerFor("mongodb-sink", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/MongoDBSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/MongoDBSinkTests.java
index 8ab9776..c9c2661 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/MongoDBSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/MongoDBSinkTests.java
@@ -21,14 +21,17 @@ import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MongoDBContainer;
-import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.containers.wait.strategy.Wait;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
@@ -37,6 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
+@ExtendWith(BaseContainerExtension.class)
abstract class MongoDBSinkTests {
private static MongoTemplate mongoTemplate;
@@ -44,7 +48,7 @@ abstract class MongoDBSinkTests {
@Autowired
private TestTopicSender testTopicSender;
- private static MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"))
+ private static final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.0.10")
.withExposedPorts(27017)
.withStartupTimeout(Duration.ofMinutes(2));
@@ -55,11 +59,14 @@ abstract class MongoDBSinkTests {
private static StreamAppContainer sink;
- protected static void configureSink(StreamAppContainer baseContainer) {
+ @BeforeAll
+ protected static void configureSink() {
mongoDBContainer.start();
- sink = baseContainer
+ sink = BaseContainerExtension.containerInstance()
.withEnv("MONGODB_CONSUMER_COLLECTION", "test")
- .withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString());
+ .withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString())
+ .waitingFor(Wait.forLogMessage(".*Started MongodbSink.*", 1));
+
sink.start();
buildMongoTemplate();
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/RabbitMQMongoDBSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/RabbitMQMongoDBSinkTests.java
index 1a13b49..4342c98 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/RabbitMQMongoDBSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/RabbitMQMongoDBSinkTests.java
@@ -16,20 +16,17 @@
package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
-import org.junit.jupiter.api.BeforeAll;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
-class RabbitMQMongoDBSinkTests extends MongoDBSinkTests {
+public class RabbitMQMongoDBSinkTests extends MongoDBSinkTests {
- @BeforeAll
- static void init() {
- configureSink(RabbitMQConfig
- .prepackagedContainerFor("mongodb-sink", VERSION));
- }
+ @BaseContainer
+ public static StreamAppContainer sink = RabbitMQConfig.prepackagedContainerFor("mongodb-sink", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/KafkaTcpSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/KafkaTcpSinkTests.java
index 5e9b7dc..c21f210 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/KafkaTcpSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/KafkaTcpSinkTests.java
@@ -16,18 +16,17 @@
package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
-import org.testcontainers.junit.jupiter.Container;
-
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
public class KafkaTcpSinkTests extends TcpSinkTests {
- @Container
- static StreamAppContainer container = configureSink(KafkaConfig.prepackagedContainerFor("tcp-sink", VERSION));
+ @BaseContainer
+ public static StreamAppContainer sink = KafkaConfig.prepackagedContainerFor("tcp-sink", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/RabbitMQTcpSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/RabbitMQTcpSinkTests.java
index 993e49d..a4d0a4c 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/RabbitMQTcpSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/RabbitMQTcpSinkTests.java
@@ -16,17 +16,16 @@
package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
-import org.testcontainers.junit.jupiter.Container;
-
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
public class RabbitMQTcpSinkTests extends TcpSinkTests {
- @Container
- static StreamAppContainer container = configureSink(RabbitMQConfig.prepackagedContainerFor("tcp-sink", VERSION));
+ @BaseContainer
+ public static StreamAppContainer sink = RabbitMQConfig.prepackagedContainerFor("tcp-sink", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/TcpSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/TcpSinkTests.java
index c927a9f..84fa884 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/TcpSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/TcpSinkTests.java
@@ -28,15 +28,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.wait.strategy.Wait;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
+@ExtendWith(BaseContainerExtension.class)
abstract class TcpSinkTests {
private static int tcpPort;
@@ -50,15 +54,18 @@ abstract class TcpSinkTests {
@Autowired
private TestTopicSender testTopicSender;
- protected static StreamAppContainer configureSink(StreamAppContainer baseContainer) {
+ @BeforeAll
+ static void configureSink() {
tcpPort = StreamAppContainerTestUtils.findAvailablePort();
- sink = baseContainer.withEnv("TCP_CONSUMER_HOST", StreamAppContainerTestUtils.localHostAddress())
+ startTcpServer();
+ sink = BaseContainerExtension.containerInstance()
+ .withEnv("TCP_CONSUMER_HOST", StreamAppContainerTestUtils.localHostAddress())
.withEnv("TCP_PORT", String.valueOf(tcpPort))
- .withEnv("TCP_CONSUMER_ENCODER", "CRLF");
- return sink;
+ .withEnv("TCP_CONSUMER_ENCODER", "CRLF")
+ .waitingFor(Wait.forLogMessage(".*Started TcpSink.*", 1));
+ sink.start();
}
- @BeforeAll
static void startTcpServer() {
socketReady.set(false);
new Thread(() -> {
@@ -85,6 +92,7 @@ abstract class TcpSinkTests {
@AfterAll
static void cleanUp() throws IOException {
+ sink.stop();
socket.close();
}
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/GeodeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/GeodeSourceTests.java
index 4c29043..52d3087 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/GeodeSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/GeodeSourceTests.java
@@ -30,22 +30,25 @@ import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.fn.test.support.geode.GeodeContainer;
-import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
-public abstract class GeodeSourceTests {
+@ExtendWith(BaseContainerExtension.class)
+abstract class GeodeSourceTests {
private static int locatorPort = StreamAppContainerTestUtils.findAvailablePort();
private static int cacheServerPort = StreamAppContainerTestUtils.findAvailablePort();
@@ -54,8 +57,6 @@ public abstract class GeodeSourceTests {
private static ClientCache clientCache;
- protected static LogMatcher logMatcher = LogMatcher.contains("Started GeodeSource");
-
protected static StreamAppContainer source;
@Autowired
@@ -77,7 +78,8 @@ public abstract class GeodeSourceTests {
.withCommand("tail", "-f", "/dev/null")
.withStartupTimeout(DEFAULT_DURATION.multipliedBy(2));
- protected static void initializeGeodeCacheThenStartSource() {
+ @BeforeAll
+ static void initializeGeodeCacheThenStartSource() {
// Not using locator is faster.
System.out.println(geode.execGfsh(
"start server --name=Server1 " + "--hostname-for-clients=geode" + " --server-port="
@@ -92,23 +94,18 @@ public abstract class GeodeSourceTests {
.createClientRegionFactory(ClientRegionShortcut.PROXY)
.create("myRegion");
- source.withEnv("GEODE_POOL_CONNECT_TYPE", "server")
+ source = BaseContainerExtension.containerInstance().withEnv("GEODE_POOL_CONNECT_TYPE", "server")
.withEnv("GEODE_REGION_REGION_NAME", "myRegion")
.withEnv("GEODE_POOL_HOST_ADDRESSES",
StreamAppContainerTestUtils.localHostAddress() + ":" + cacheServerPort)
- .withLogConsumer(logMatcher).log().start();
+ .waitingFor(Wait.forLogMessage(".*Started GeodeSource.*", 1));
+ source.start();
}
@Test
void test() throws InterruptedException {
- await().atMost(Duration.ofMinutes(2)).until(logMatcher.matches());
- if (source instanceof RabbitMQStreamAppContainer) {
- // TODO: Some race condition. Need to investigate
- Thread.sleep(10000);
- }
String random = UUID.randomUUID().toString();
clientRegion.put(random, random);
-
await().atMost(Duration.ofSeconds(30))
.until(outputMatcher.payloadMatches((String s) -> s.contains(random)));
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/KafkaGeodeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/KafkaGeodeSourceTests.java
index bc31452..be4ff06 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/KafkaGeodeSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/KafkaGeodeSourceTests.java
@@ -16,21 +16,17 @@
package org.springframework.cloud.stream.apps.integration.test.source.geode;
-import org.junit.jupiter.api.BeforeAll;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
-class KafkaGeodeSourceTests extends GeodeSourceTests {
+public class KafkaGeodeSourceTests extends GeodeSourceTests {
- @BeforeAll
- static void init() {
- source = KafkaConfig
- .prepackagedContainerFor("geode-source", VERSION);
- initializeGeodeCacheThenStartSource();
- }
+ @BaseContainer
+ public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("geode-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/RabbitMQGeodeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/RabbitMQGeodeSourceTests.java
index 1a788b3..82f9799 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/RabbitMQGeodeSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/RabbitMQGeodeSourceTests.java
@@ -16,21 +16,17 @@
package org.springframework.cloud.stream.apps.integration.test.source.geode;
-import org.junit.jupiter.api.BeforeAll;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
-class RabbitMQGeodeSourceTests extends GeodeSourceTests {
+public class RabbitMQGeodeSourceTests extends GeodeSourceTests {
- @BeforeAll
- static void init() {
- source = RabbitMQConfig
- .prepackagedContainerFor("geode-source", VERSION);
- initializeGeodeCacheThenStartSource();
- }
+ @BaseContainer
+ public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("geode-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/HttpSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/HttpSourceTests.java
index cd1ca3a..a3137b9 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/HttpSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/HttpSourceTests.java
@@ -22,7 +22,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.wait.strategy.Wait;
import reactor.core.publisher.Mono;
@@ -30,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
@@ -38,6 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
+@ExtendWith(BaseContainerExtension.class)
public abstract class HttpSourceTests {
private static int serverPort = StreamAppContainerTestUtils.findAvailablePort();
@@ -46,11 +50,13 @@ public abstract class HttpSourceTests {
private static StreamAppContainer source;
- protected static StreamAppContainer configureSource(StreamAppContainer baseContainer) {
- source = baseContainer.withEnv("SERVER_PORT", String.valueOf(serverPort))
+ @BeforeAll
+ static void configureSource() {
+ source = BaseContainerExtension.containerInstance()
+ .withEnv("SERVER_PORT", String.valueOf(serverPort))
.withExposedPorts(serverPort)
.waitingFor(Wait.forListeningPort().withStartupTimeout(DEFAULT_DURATION));
- return source;
+ source.start();
}
@Autowired
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/KafkaHttpSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/KafkaHttpSourceTests.java
index ae10595..9d21886 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/KafkaHttpSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/KafkaHttpSourceTests.java
@@ -16,17 +16,16 @@
package org.springframework.cloud.stream.apps.integration.test.source.http;
-import org.testcontainers.junit.jupiter.Container;
-
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
-class KafkaHttpSourceTests extends HttpSourceTests {
+public class KafkaHttpSourceTests extends HttpSourceTests {
- @Container
- static StreamAppContainer base = configureSource(KafkaConfig.prepackagedContainerFor("http-source", VERSION));
+ @BaseContainer
+ public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("http-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/RabbitMQHttpSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/RabbitMQHttpSourceTests.java
index 59c86d2..8123220 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/RabbitMQHttpSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/http/RabbitMQHttpSourceTests.java
@@ -16,17 +16,16 @@
package org.springframework.cloud.stream.apps.integration.test.source.http;
-import org.testcontainers.junit.jupiter.Container;
-
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
-class RabbitMQHttpSourceTests extends HttpSourceTests {
+public class RabbitMQHttpSourceTests extends HttpSourceTests {
- @Container
- static StreamAppContainer base = configureSource(RabbitMQConfig.prepackagedContainerFor("http-source", VERSION));
+ @BaseContainer
+ public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("http-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/JdbcSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/JdbcSourceTests.java
index 8580ffc..1cfbea7 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/JdbcSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/JdbcSourceTests.java
@@ -17,7 +17,9 @@
package org.springframework.cloud.stream.apps.integration.test.source.jdbc;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Container;
@@ -26,12 +28,14 @@ import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
+@ExtendWith(BaseContainerExtension.class)
abstract class JdbcSourceTests {
@Autowired
@@ -50,8 +54,9 @@ abstract class JdbcSourceTests {
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withCommand("--init-file", "/init.sql");
- protected static void configureSource(StreamAppContainer baseContainer) {
- source = baseContainer
+ @BeforeAll
+ protected static void configureSource() {
+ source = BaseContainerExtension.containerInstance()
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/KafkaJdbcSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/KafkaJdbcSourceTests.java
index 89021b3..017cd57 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/KafkaJdbcSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/KafkaJdbcSourceTests.java
@@ -16,19 +16,16 @@
package org.springframework.cloud.stream.apps.integration.test.source.jdbc;
-import org.junit.jupiter.api.BeforeAll;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
-class KafkaJdbcSourceTests extends JdbcSourceTests {
+public class KafkaJdbcSourceTests extends JdbcSourceTests {
- @BeforeAll
- static void init() {
- configureSource(KafkaConfig
- .prepackagedContainerFor("jdbc-source", VERSION));
- }
+ @BaseContainer
+ public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("jdbc-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/RabbitMQJdbcSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/RabbitMQJdbcSourceTests.java
index 6912bf5..8bc32a2 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/RabbitMQJdbcSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/jdbc/RabbitMQJdbcSourceTests.java
@@ -16,19 +16,16 @@
package org.springframework.cloud.stream.apps.integration.test.source.jdbc;
-import org.junit.jupiter.api.BeforeAll;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
-class RabbitMQJdbcSourceTests extends JdbcSourceTests {
+public class RabbitMQJdbcSourceTests extends JdbcSourceTests {
- @BeforeAll
- static void init() {
- configureSource(RabbitMQConfig
- .prepackagedContainerFor("jdbc-source", VERSION));
- }
+ @BaseContainer
+ public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("jdbc-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/KafkaS3SourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/KafkaS3SourceTests.java
index 82ce28c..3ce28e4 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/KafkaS3SourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/KafkaS3SourceTests.java
@@ -16,19 +16,16 @@
package org.springframework.cloud.stream.apps.integration.test.source.s3;
-import org.junit.jupiter.api.BeforeEach;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
-class KafkaS3SourceTests extends S3SourceTests {
+public class KafkaS3SourceTests extends S3SourceTests {
- @BeforeEach
- void init() {
- configureSource(KafkaConfig
- .prepackagedContainerFor("s3-source", VERSION));
- }
+ @BaseContainer
+ public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("s3-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/RabbitMQS3SourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/RabbitMQS3SourceTests.java
index 3b74fd0..23a249d 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/RabbitMQS3SourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/RabbitMQS3SourceTests.java
@@ -16,19 +16,16 @@
package org.springframework.cloud.stream.apps.integration.test.source.s3;
-import org.junit.jupiter.api.BeforeEach;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
-class RabbitMQS3SourceTests extends S3SourceTests {
+public class RabbitMQS3SourceTests extends S3SourceTests {
- @BeforeEach
- void init() {
- configureSource(RabbitMQConfig
- .prepackagedContainerFor("s3-source", VERSION));
- }
+ @BaseContainer
+ public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("s3-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/S3SourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/S3SourceTests.java
index 43f76a8..bc06c95 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/S3SourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/s3/S3SourceTests.java
@@ -31,7 +31,9 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
import com.github.dockerjava.api.command.CreateContainerCmd;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
@@ -40,7 +42,7 @@ import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
-import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
@@ -49,15 +51,13 @@ import static org.springframework.cloud.stream.app.test.integration.StreamAppCon
import static org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils.resourceAsFile;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
+@ExtendWith(BaseContainerExtension.class)
abstract class S3SourceTests {
private static AmazonS3 s3Client;
private StreamAppContainer source;
- @Autowired
- private TestTopicListener testTopicListener;
-
@Autowired
private OutputMatcher outputMatcher;
@@ -90,9 +90,9 @@ abstract class S3SourceTests {
}
- protected void configureSource(StreamAppContainer baseContainer) {
- source = baseContainer
- .withEnv("S3_SUPPLIER_REMOTE_DIR", "bucket")
+ @BeforeEach
+ void configureSource() {
+ source = BaseContainerExtension.containerInstance()
.withEnv("S3_COMMON_ENDPOINT_URL", "http://" + localHostAddress() + ":" + minio.getMappedPort(9000))
.withEnv("S3_COMMON_PATH_STYLE_ACCESS", "true")
.withEnv("CLOUD_AWS_STACK_AUTO", "false")
@@ -100,13 +100,13 @@ abstract class S3SourceTests {
.withEnv("CLOUD_AWS_CREDENTIALS_SECRET_KEY", "minio123")
.withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
.withEnv("CLOUD_AWS_REGION_STATIC", "us-east-1").log();
+ s3Client.createBucket("bucket");
}
@Test
void testLines() {
startContainer(
fluentMap().withEntry("FILE_CONSUMER_MODE", "lines"));
- s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
@@ -123,7 +123,6 @@ abstract class S3SourceTests {
.withEntry("TASK_LAUNCH_REQUEST_TASK_NAME", "myTask")
.withEntry("FILE_CONSUMER_MODE", "ref"));
- s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
await().atMost(DEFAULT_DURATION).until(outputMatcher.payloadMatches(s -> s.equals(
@@ -132,13 +131,11 @@ abstract class S3SourceTests {
@Test
void testListOnly() {
-
startContainer(
fluentMap()
.withEntry("FILE_CONSUMER_MODE", "ref")
.withEntry("S3_SUPPLIER_LIST_ONLY", "true"));
- s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
await().atMost(DEFAULT_DURATION)
@@ -148,7 +145,8 @@ abstract class S3SourceTests {
private void startContainer(Map environment) {
source.withEnv(environment);
- source.start();
+ source.waitingFor(Wait.forLogMessage(".*Started S3Source.*", 1)).start();
+ environment.keySet().forEach(k -> source.getEnvMap().remove(k));
}
@AfterEach
@@ -158,7 +156,7 @@ abstract class S3SourceTests {
s3Client.deleteBucket("bucket");
}
source.stop();
- testTopicListener.clearMessageMatchers();
+ outputMatcher.clearMessageMatchers();
}
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/KafkaSftpSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/KafkaSftpSourceTests.java
index 3c5fa32..452336e 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/KafkaSftpSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/KafkaSftpSourceTests.java
@@ -16,20 +16,17 @@
package org.springframework.cloud.stream.apps.integration.test.source.sftp;
-import org.junit.jupiter.api.BeforeEach;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
-class KafkaSftpSourceTests extends SftpSourceTests {
+public class KafkaSftpSourceTests extends SftpSourceTests {
- @BeforeEach
- private void initKafka() {
- configureSource(KafkaConfig
- .prepackagedContainerFor("sftp-source", VERSION));
- }
+ @BaseContainer
+ public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("sftp-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/RabbitMQSftpSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/RabbitMQSftpSourceTests.java
index fa1f59a..6dd2369 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/RabbitMQSftpSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/RabbitMQSftpSourceTests.java
@@ -16,20 +16,17 @@
package org.springframework.cloud.stream.apps.integration.test.source.sftp;
-import org.junit.jupiter.api.BeforeEach;
-
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
-class RabbitMQSftpSourceTests extends SftpSourceTests {
+public class RabbitMQSftpSourceTests extends SftpSourceTests {
- @BeforeEach
- private void initKafka() {
- configureSource(RabbitMQConfig
- .prepackagedContainerFor("sftp-source", VERSION));
- }
+ @BaseContainer
+ public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("sftp-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/SftpSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/SftpSourceTests.java
index 6fb244d..b1e22a3 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/SftpSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/sftp/SftpSourceTests.java
@@ -20,7 +20,9 @@ import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
@@ -30,10 +32,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
+@ExtendWith(BaseContainerExtension.class)
abstract class SftpSourceTests {
private StreamAppContainer source;
@@ -45,9 +49,11 @@ abstract class SftpSourceTests {
.withClasspathResourceMapping("sftp", "/home/user/remote", BindMode.READ_ONLY)
.withStartupTimeout(DEFAULT_DURATION);
- protected void configureSource(StreamAppContainer baseContainer) {
+ @BeforeEach
+ void configureSource() {
await().atMost(DEFAULT_DURATION).until(() -> sftp.isRunning());
- source = baseContainer.withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
+ source = BaseContainerExtension.containerInstance()
+ .withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
.withEnv("SFTP_SUPPLIER_REMOTE_DIR", "/remote")
.withEnv("SFTP_SUPPLIER_FACTORY_USERNAME", "user")
.withEnv("SFTP_SUPPLIER_FACTORY_PASSWORD", "pass")
@@ -70,6 +76,7 @@ abstract class SftpSourceTests {
private void startContainer(Map environment) {
source.withEnv(environment);
source.start();
+ environment.keySet().forEach(k -> source.getEnvMap().remove(k));
}
@AfterEach
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/KafkaTimeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/KafkaTimeSourceTests.java
index 851c064..9dab4e7 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/KafkaTimeSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/KafkaTimeSourceTests.java
@@ -19,8 +19,9 @@ package org.springframework.cloud.stream.apps.integration.test.source.time;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
+
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@@ -28,8 +29,6 @@ import static org.springframework.cloud.stream.apps.integration.test.common.Conf
class KafkaTimeSourceTests extends TimeSourceTests {
@Container
- static StreamAppContainer source = KafkaConfig
- .prepackagedContainerFor("time-source", VERSION)
- .withLogConsumer(logMatcher);
+ static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("time-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/RabbitMQTimeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/RabbitMQTimeSourceTests.java
index b251b0c..fe06d29 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/RabbitMQTimeSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/RabbitMQTimeSourceTests.java
@@ -19,17 +19,15 @@ package org.springframework.cloud.stream.apps.integration.test.source.time;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
+
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
class RabbitMQTimeSourceTests extends TimeSourceTests {
+
@Container
- static StreamAppContainer source = RabbitMQConfig
- .prepackagedContainerFor("time-source", VERSION)
- .withLogConsumer(logMatcher)
- .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
- .withEnv("LOGGING_LEVEL_ORG_APACHE_GEODE", "DEBUG");
+ static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("time-source", VERSION);
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/TimeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/TimeSourceTests.java
index b8c5c72..e8c34f5 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/TimeSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/time/TimeSourceTests.java
@@ -18,10 +18,10 @@ package org.springframework.cloud.stream.apps.integration.test.source.time;
import java.util.regex.Pattern;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import static org.awaitility.Awaitility.await;
@@ -32,15 +32,17 @@ abstract class TimeSourceTests {
// "MM/dd/yy HH:mm:ss";
private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
- static LogMatcher logMatcher = LogMatcher.contains("Started TimeSource");
-
@Autowired
private OutputMatcher outputMatcher;
@Test
void test() {
- await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
await().atMost(DEFAULT_DURATION)
.until(outputMatcher.payloadMatches((String s) -> pattern.matcher(s).matches()));
}
+
+ @AfterEach
+ void cleanUp() {
+ outputMatcher.clearMessageMatchers();
+ }
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/stream/jdbclog/KafkaJdbcLogStreamTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/stream/jdbclog/KafkaJdbcLogStreamTests.java
index 4674377..39130f3 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/stream/jdbclog/KafkaJdbcLogStreamTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/stream/jdbclog/KafkaJdbcLogStreamTests.java
@@ -24,8 +24,8 @@ import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/stream/tiktok/KafkaTikTokTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/stream/tiktok/KafkaTikTokTests.java
index 16e97e9..04ea898 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/stream/tiktok/KafkaTikTokTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/stream/tiktok/KafkaTikTokTests.java
@@ -21,8 +21,8 @@ import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
+import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;