Refactor and debug for latest test framework
This commit is contained in:
@@ -42,7 +42,7 @@
|
||||
<wiremock.version>2.27.1</wiremock.version>
|
||||
<aws.version>1.11.415</aws.version>
|
||||
<mysql-connector-java.version>8.0.16</mysql-connector-java.version>
|
||||
<junit-platform-runner.version>1.6.2</junit-platform-runner.version>
|
||||
<junit-jupiter-api.version>5.6.2</junit-jupiter-api.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
@@ -68,17 +68,18 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.platform</groupId>
|
||||
<artifactId>junit-platform-testkit</artifactId>
|
||||
<version>${junit-platform-runner.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>junit-jupiter</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>${junit-jupiter-api.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>kafka</artifactId>
|
||||
@@ -154,12 +155,6 @@
|
||||
<artifactId>spring-boot-starter-jdbc</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.platform</groupId>
|
||||
<artifactId>junit-platform-runner</artifactId>
|
||||
<version>${junit-platform-runner.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
@@ -19,8 +19,8 @@ package org.springframework.cloud.stream.apps.integration.test.processor.httpreq
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
|
||||
@@ -19,8 +19,8 @@ package org.springframework.cloud.stream.apps.integration.test.processor.httpreq
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
|
||||
@@ -18,16 +18,19 @@ package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
|
||||
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.BindMode;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
@@ -36,13 +39,14 @@ import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
|
||||
|
||||
@ExtendWith(BaseContainerExtension.class)
|
||||
public abstract class JdbcSinkTests {
|
||||
|
||||
private static JdbcTemplate jdbcTemplate;
|
||||
|
||||
private static StreamAppContainer sink;
|
||||
|
||||
private static LogMatcher logMatcher = LogMatcher.contains("Started JdbcSink");
|
||||
//private static LogMatcher startupLogEntry = LogMatcher.contains("Started JdbcSink");
|
||||
|
||||
@Autowired
|
||||
private TestTopicSender testTopicSender;
|
||||
@@ -58,21 +62,19 @@ public abstract class JdbcSinkTests {
|
||||
.withLogConsumer(appLog("mysql-for-sink"))
|
||||
.withCommand("--init-file", "/init.sql");
|
||||
|
||||
protected static void configureSink(StreamAppContainer baseContainer) {
|
||||
sink = baseContainer
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
// BaseContainerExtension.logMatcher().map(lm -> startupLogEntry = lm);
|
||||
sink = BaseContainerExtension.containerInstance()
|
||||
.dependsOn(mySQL)
|
||||
.withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
|
||||
.withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
|
||||
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
|
||||
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
|
||||
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
|
||||
// .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
|
||||
// .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_JDBC", "DEBUG")
|
||||
// .withEnv("LOGGING_LEVEL_ORG_MARIADB_JDBC", "DEBUG")
|
||||
// .log()
|
||||
.withEnv("SPRING_DATASOURCE_URL",
|
||||
"jdbc:mariadb://mysql-for-sink:3306/test")
|
||||
.withLogConsumer(logMatcher);
|
||||
.waitingFor(Wait.forLogMessage(".*Started JdbcSink.*", 1));
|
||||
startSink();
|
||||
}
|
||||
|
||||
@@ -89,7 +91,11 @@ public abstract class JdbcSinkTests {
|
||||
.until(() -> jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)
|
||||
.intValue() == 0);
|
||||
sink.start();
|
||||
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
|
||||
// await().atMost(DEFAULT_DURATION).conditionEvaluationListener(evaluatedCondition -> {
|
||||
// System.out.println("App started ....................................... in "
|
||||
// + evaluatedCondition.getElapsedTimeInMS());
|
||||
// }).until(startupLogEntry.matches());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -16,19 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@KafkaStreamAppTest
|
||||
class KafkaJdbcSinkTests extends JdbcSinkTests {
|
||||
public class KafkaJdbcSinkTests extends JdbcSinkTests {
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
configureSink(KafkaConfig
|
||||
.prepackagedContainerFor("jdbc-sink", VERSION));
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer sink = KafkaConfig.prepackagedContainerFor("jdbc-sink", VERSION);
|
||||
}
|
||||
|
||||
@@ -16,19 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@RabbitMQStreamAppTest
|
||||
class RabbitMQJdbcSinkTests extends JdbcSinkTests {
|
||||
public class RabbitMQJdbcSinkTests extends JdbcSinkTests {
|
||||
|
||||
@BaseContainer
|
||||
public static StreamAppContainer sink = RabbitMQConfig.prepackagedContainerFor("jdbc-sink", VERSION);
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
configureSink(RabbitMQConfig
|
||||
.prepackagedContainerFor("jdbc-sink", VERSION));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,19 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@KafkaStreamAppTest
|
||||
class KafkaMongoDBSinkTests extends MongoDBSinkTests {
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
configureSink(KafkaConfig
|
||||
.prepackagedContainerFor("mongodb-sink", VERSION));
|
||||
}
|
||||
public class KafkaMongoDBSinkTests extends MongoDBSinkTests {
|
||||
|
||||
@BaseContainer
|
||||
public static StreamAppContainer sink = KafkaConfig.prepackagedContainerFor("mongodb-sink", VERSION);
|
||||
}
|
||||
|
||||
@@ -21,14 +21,17 @@ import java.util.List;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.MongoDBContainer;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
|
||||
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
|
||||
import org.springframework.data.mongodb.MongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
||||
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
|
||||
@@ -37,6 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
|
||||
|
||||
@ExtendWith(BaseContainerExtension.class)
|
||||
abstract class MongoDBSinkTests {
|
||||
|
||||
private static MongoTemplate mongoTemplate;
|
||||
@@ -44,7 +48,7 @@ abstract class MongoDBSinkTests {
|
||||
@Autowired
|
||||
private TestTopicSender testTopicSender;
|
||||
|
||||
private static MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"))
|
||||
private static final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.0.10")
|
||||
.withExposedPorts(27017)
|
||||
.withStartupTimeout(Duration.ofMinutes(2));
|
||||
|
||||
@@ -55,11 +59,14 @@ abstract class MongoDBSinkTests {
|
||||
|
||||
private static StreamAppContainer sink;
|
||||
|
||||
protected static void configureSink(StreamAppContainer baseContainer) {
|
||||
@BeforeAll
|
||||
protected static void configureSink() {
|
||||
mongoDBContainer.start();
|
||||
sink = baseContainer
|
||||
sink = BaseContainerExtension.containerInstance()
|
||||
.withEnv("MONGODB_CONSUMER_COLLECTION", "test")
|
||||
.withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString());
|
||||
.withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString())
|
||||
.waitingFor(Wait.forLogMessage(".*Started MongodbSink.*", 1));
|
||||
|
||||
sink.start();
|
||||
buildMongoTemplate();
|
||||
}
|
||||
|
||||
@@ -16,20 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@RabbitMQStreamAppTest
|
||||
class RabbitMQMongoDBSinkTests extends MongoDBSinkTests {
|
||||
public class RabbitMQMongoDBSinkTests extends MongoDBSinkTests {
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
configureSink(RabbitMQConfig
|
||||
.prepackagedContainerFor("mongodb-sink", VERSION));
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer sink = RabbitMQConfig.prepackagedContainerFor("mongodb-sink", VERSION);
|
||||
|
||||
}
|
||||
|
||||
@@ -16,18 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
|
||||
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@KafkaStreamAppTest
|
||||
public class KafkaTcpSinkTests extends TcpSinkTests {
|
||||
|
||||
@Container
|
||||
static StreamAppContainer container = configureSink(KafkaConfig.prepackagedContainerFor("tcp-sink", VERSION));
|
||||
@BaseContainer
|
||||
public static StreamAppContainer sink = KafkaConfig.prepackagedContainerFor("tcp-sink", VERSION);
|
||||
|
||||
}
|
||||
|
||||
@@ -16,17 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
|
||||
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@RabbitMQStreamAppTest
|
||||
public class RabbitMQTcpSinkTests extends TcpSinkTests {
|
||||
|
||||
@Container
|
||||
static StreamAppContainer container = configureSink(RabbitMQConfig.prepackagedContainerFor("tcp-sink", VERSION));
|
||||
@BaseContainer
|
||||
public static StreamAppContainer sink = RabbitMQConfig.prepackagedContainerFor("tcp-sink", VERSION);
|
||||
}
|
||||
|
||||
@@ -28,15 +28,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
|
||||
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
|
||||
|
||||
@ExtendWith(BaseContainerExtension.class)
|
||||
abstract class TcpSinkTests {
|
||||
|
||||
private static int tcpPort;
|
||||
@@ -50,15 +54,18 @@ abstract class TcpSinkTests {
|
||||
@Autowired
|
||||
private TestTopicSender testTopicSender;
|
||||
|
||||
protected static StreamAppContainer configureSink(StreamAppContainer baseContainer) {
|
||||
@BeforeAll
|
||||
static void configureSink() {
|
||||
tcpPort = StreamAppContainerTestUtils.findAvailablePort();
|
||||
sink = baseContainer.withEnv("TCP_CONSUMER_HOST", StreamAppContainerTestUtils.localHostAddress())
|
||||
startTcpServer();
|
||||
sink = BaseContainerExtension.containerInstance()
|
||||
.withEnv("TCP_CONSUMER_HOST", StreamAppContainerTestUtils.localHostAddress())
|
||||
.withEnv("TCP_PORT", String.valueOf(tcpPort))
|
||||
.withEnv("TCP_CONSUMER_ENCODER", "CRLF");
|
||||
return sink;
|
||||
.withEnv("TCP_CONSUMER_ENCODER", "CRLF")
|
||||
.waitingFor(Wait.forLogMessage(".*Started TcpSink.*", 1));
|
||||
sink.start();
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
static void startTcpServer() {
|
||||
socketReady.set(false);
|
||||
new Thread(() -> {
|
||||
@@ -85,6 +92,7 @@ abstract class TcpSinkTests {
|
||||
|
||||
@AfterAll
|
||||
static void cleanUp() throws IOException {
|
||||
sink.stop();
|
||||
socket.close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,22 +30,25 @@ import org.apache.geode.cache.client.ClientCache;
|
||||
import org.apache.geode.cache.client.ClientCacheFactory;
|
||||
import org.apache.geode.cache.client.ClientRegionShortcut;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.images.builder.ImageFromDockerfile;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.fn.test.support.geode.GeodeContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
|
||||
|
||||
public abstract class GeodeSourceTests {
|
||||
@ExtendWith(BaseContainerExtension.class)
|
||||
abstract class GeodeSourceTests {
|
||||
private static int locatorPort = StreamAppContainerTestUtils.findAvailablePort();
|
||||
|
||||
private static int cacheServerPort = StreamAppContainerTestUtils.findAvailablePort();
|
||||
@@ -54,8 +57,6 @@ public abstract class GeodeSourceTests {
|
||||
|
||||
private static ClientCache clientCache;
|
||||
|
||||
protected static LogMatcher logMatcher = LogMatcher.contains("Started GeodeSource");
|
||||
|
||||
protected static StreamAppContainer source;
|
||||
|
||||
@Autowired
|
||||
@@ -77,7 +78,8 @@ public abstract class GeodeSourceTests {
|
||||
.withCommand("tail", "-f", "/dev/null")
|
||||
.withStartupTimeout(DEFAULT_DURATION.multipliedBy(2));
|
||||
|
||||
protected static void initializeGeodeCacheThenStartSource() {
|
||||
@BeforeAll
|
||||
static void initializeGeodeCacheThenStartSource() {
|
||||
// Not using locator is faster.
|
||||
System.out.println(geode.execGfsh(
|
||||
"start server --name=Server1 " + "--hostname-for-clients=geode" + " --server-port="
|
||||
@@ -92,23 +94,18 @@ public abstract class GeodeSourceTests {
|
||||
.createClientRegionFactory(ClientRegionShortcut.PROXY)
|
||||
.create("myRegion");
|
||||
|
||||
source.withEnv("GEODE_POOL_CONNECT_TYPE", "server")
|
||||
source = BaseContainerExtension.containerInstance().withEnv("GEODE_POOL_CONNECT_TYPE", "server")
|
||||
.withEnv("GEODE_REGION_REGION_NAME", "myRegion")
|
||||
.withEnv("GEODE_POOL_HOST_ADDRESSES",
|
||||
StreamAppContainerTestUtils.localHostAddress() + ":" + cacheServerPort)
|
||||
.withLogConsumer(logMatcher).log().start();
|
||||
.waitingFor(Wait.forLogMessage(".*Started GeodeSource.*", 1));
|
||||
source.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
void test() throws InterruptedException {
|
||||
await().atMost(Duration.ofMinutes(2)).until(logMatcher.matches());
|
||||
if (source instanceof RabbitMQStreamAppContainer) {
|
||||
// TODO: Some race condition. Need to investigate
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
String random = UUID.randomUUID().toString();
|
||||
clientRegion.put(random, random);
|
||||
|
||||
await().atMost(Duration.ofSeconds(30))
|
||||
.until(outputMatcher.payloadMatches((String s) -> s.contains(random)));
|
||||
}
|
||||
|
||||
@@ -16,21 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.geode;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@KafkaStreamAppTest
|
||||
class KafkaGeodeSourceTests extends GeodeSourceTests {
|
||||
public class KafkaGeodeSourceTests extends GeodeSourceTests {
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
source = KafkaConfig
|
||||
.prepackagedContainerFor("geode-source", VERSION);
|
||||
initializeGeodeCacheThenStartSource();
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("geode-source", VERSION);
|
||||
|
||||
}
|
||||
|
||||
@@ -16,21 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.geode;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@RabbitMQStreamAppTest
|
||||
class RabbitMQGeodeSourceTests extends GeodeSourceTests {
|
||||
public class RabbitMQGeodeSourceTests extends GeodeSourceTests {
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
source = RabbitMQConfig
|
||||
.prepackagedContainerFor("geode-source", VERSION);
|
||||
initializeGeodeCacheThenStartSource();
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("geode-source", VERSION);
|
||||
|
||||
}
|
||||
|
||||
@@ -22,7 +22,9 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@@ -30,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
@@ -38,6 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
|
||||
|
||||
@ExtendWith(BaseContainerExtension.class)
|
||||
public abstract class HttpSourceTests {
|
||||
|
||||
private static int serverPort = StreamAppContainerTestUtils.findAvailablePort();
|
||||
@@ -46,11 +50,13 @@ public abstract class HttpSourceTests {
|
||||
|
||||
private static StreamAppContainer source;
|
||||
|
||||
protected static StreamAppContainer configureSource(StreamAppContainer baseContainer) {
|
||||
source = baseContainer.withEnv("SERVER_PORT", String.valueOf(serverPort))
|
||||
@BeforeAll
|
||||
static void configureSource() {
|
||||
source = BaseContainerExtension.containerInstance()
|
||||
.withEnv("SERVER_PORT", String.valueOf(serverPort))
|
||||
.withExposedPorts(serverPort)
|
||||
.waitingFor(Wait.forListeningPort().withStartupTimeout(DEFAULT_DURATION));
|
||||
return source;
|
||||
source.start();
|
||||
}
|
||||
|
||||
@Autowired
|
||||
|
||||
@@ -16,17 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.http;
|
||||
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@KafkaStreamAppTest
|
||||
class KafkaHttpSourceTests extends HttpSourceTests {
|
||||
public class KafkaHttpSourceTests extends HttpSourceTests {
|
||||
|
||||
@Container
|
||||
static StreamAppContainer base = configureSource(KafkaConfig.prepackagedContainerFor("http-source", VERSION));
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("http-source", VERSION);
|
||||
}
|
||||
|
||||
@@ -16,17 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.http;
|
||||
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@RabbitMQStreamAppTest
|
||||
class RabbitMQHttpSourceTests extends HttpSourceTests {
|
||||
public class RabbitMQHttpSourceTests extends HttpSourceTests {
|
||||
|
||||
@Container
|
||||
static StreamAppContainer base = configureSource(RabbitMQConfig.prepackagedContainerFor("http-source", VERSION));
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("http-source", VERSION);
|
||||
}
|
||||
|
||||
@@ -17,7 +17,9 @@
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.jdbc;
|
||||
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.BindMode;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
@@ -26,12 +28,14 @@ import org.testcontainers.utility.DockerImageName;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
|
||||
|
||||
@ExtendWith(BaseContainerExtension.class)
|
||||
abstract class JdbcSourceTests {
|
||||
|
||||
@Autowired
|
||||
@@ -50,8 +54,9 @@ abstract class JdbcSourceTests {
|
||||
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
|
||||
.withCommand("--init-file", "/init.sql");
|
||||
|
||||
protected static void configureSource(StreamAppContainer baseContainer) {
|
||||
source = baseContainer
|
||||
@BeforeAll
|
||||
protected static void configureSource() {
|
||||
source = BaseContainerExtension.containerInstance()
|
||||
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
|
||||
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
|
||||
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
|
||||
|
||||
@@ -16,19 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.jdbc;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@KafkaStreamAppTest
|
||||
class KafkaJdbcSourceTests extends JdbcSourceTests {
|
||||
public class KafkaJdbcSourceTests extends JdbcSourceTests {
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
configureSource(KafkaConfig
|
||||
.prepackagedContainerFor("jdbc-source", VERSION));
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("jdbc-source", VERSION);
|
||||
}
|
||||
|
||||
@@ -16,19 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.jdbc;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@RabbitMQStreamAppTest
|
||||
class RabbitMQJdbcSourceTests extends JdbcSourceTests {
|
||||
public class RabbitMQJdbcSourceTests extends JdbcSourceTests {
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
configureSource(RabbitMQConfig
|
||||
.prepackagedContainerFor("jdbc-source", VERSION));
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("jdbc-source", VERSION);
|
||||
}
|
||||
|
||||
@@ -16,19 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.s3;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@KafkaStreamAppTest
|
||||
class KafkaS3SourceTests extends S3SourceTests {
|
||||
public class KafkaS3SourceTests extends S3SourceTests {
|
||||
|
||||
@BeforeEach
|
||||
void init() {
|
||||
configureSource(KafkaConfig
|
||||
.prepackagedContainerFor("s3-source", VERSION));
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("s3-source", VERSION);
|
||||
}
|
||||
|
||||
@@ -16,19 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.s3;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@RabbitMQStreamAppTest
|
||||
class RabbitMQS3SourceTests extends S3SourceTests {
|
||||
public class RabbitMQS3SourceTests extends S3SourceTests {
|
||||
|
||||
@BeforeEach
|
||||
void init() {
|
||||
configureSource(RabbitMQConfig
|
||||
.prepackagedContainerFor("s3-source", VERSION));
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("s3-source", VERSION);
|
||||
}
|
||||
|
||||
@@ -31,7 +31,9 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.github.dockerjava.api.command.CreateContainerCmd;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
@@ -40,7 +42,7 @@ import org.testcontainers.utility.DockerImageName;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
|
||||
@@ -49,15 +51,13 @@ import static org.springframework.cloud.stream.app.test.integration.StreamAppCon
|
||||
import static org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils.resourceAsFile;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
|
||||
|
||||
@ExtendWith(BaseContainerExtension.class)
|
||||
abstract class S3SourceTests {
|
||||
|
||||
private static AmazonS3 s3Client;
|
||||
|
||||
private StreamAppContainer source;
|
||||
|
||||
@Autowired
|
||||
private TestTopicListener testTopicListener;
|
||||
|
||||
@Autowired
|
||||
private OutputMatcher outputMatcher;
|
||||
|
||||
@@ -90,9 +90,9 @@ abstract class S3SourceTests {
|
||||
|
||||
}
|
||||
|
||||
protected void configureSource(StreamAppContainer baseContainer) {
|
||||
source = baseContainer
|
||||
.withEnv("S3_SUPPLIER_REMOTE_DIR", "bucket")
|
||||
@BeforeEach
|
||||
void configureSource() {
|
||||
source = BaseContainerExtension.containerInstance()
|
||||
.withEnv("S3_COMMON_ENDPOINT_URL", "http://" + localHostAddress() + ":" + minio.getMappedPort(9000))
|
||||
.withEnv("S3_COMMON_PATH_STYLE_ACCESS", "true")
|
||||
.withEnv("CLOUD_AWS_STACK_AUTO", "false")
|
||||
@@ -100,13 +100,13 @@ abstract class S3SourceTests {
|
||||
.withEnv("CLOUD_AWS_CREDENTIALS_SECRET_KEY", "minio123")
|
||||
.withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
|
||||
.withEnv("CLOUD_AWS_REGION_STATIC", "us-east-1").log();
|
||||
s3Client.createBucket("bucket");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testLines() {
|
||||
startContainer(
|
||||
fluentMap().withEntry("FILE_CONSUMER_MODE", "lines"));
|
||||
s3Client.createBucket("bucket");
|
||||
s3Client.putObject(new PutObjectRequest("bucket", "test",
|
||||
resourceAsFile("minio/data")));
|
||||
|
||||
@@ -123,7 +123,6 @@ abstract class S3SourceTests {
|
||||
.withEntry("TASK_LAUNCH_REQUEST_TASK_NAME", "myTask")
|
||||
.withEntry("FILE_CONSUMER_MODE", "ref"));
|
||||
|
||||
s3Client.createBucket("bucket");
|
||||
s3Client.putObject(new PutObjectRequest("bucket", "test",
|
||||
resourceAsFile("minio/data")));
|
||||
await().atMost(DEFAULT_DURATION).until(outputMatcher.payloadMatches(s -> s.equals(
|
||||
@@ -132,13 +131,11 @@ abstract class S3SourceTests {
|
||||
|
||||
@Test
|
||||
void testListOnly() {
|
||||
|
||||
startContainer(
|
||||
fluentMap()
|
||||
.withEntry("FILE_CONSUMER_MODE", "ref")
|
||||
.withEntry("S3_SUPPLIER_LIST_ONLY", "true"));
|
||||
|
||||
s3Client.createBucket("bucket");
|
||||
s3Client.putObject(new PutObjectRequest("bucket", "test",
|
||||
resourceAsFile("minio/data")));
|
||||
await().atMost(DEFAULT_DURATION)
|
||||
@@ -148,7 +145,8 @@ abstract class S3SourceTests {
|
||||
|
||||
private void startContainer(Map<String, String> environment) {
|
||||
source.withEnv(environment);
|
||||
source.start();
|
||||
source.waitingFor(Wait.forLogMessage(".*Started S3Source.*", 1)).start();
|
||||
environment.keySet().forEach(k -> source.getEnvMap().remove(k));
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
@@ -158,7 +156,7 @@ abstract class S3SourceTests {
|
||||
s3Client.deleteBucket("bucket");
|
||||
}
|
||||
source.stop();
|
||||
testTopicListener.clearMessageMatchers();
|
||||
outputMatcher.clearMessageMatchers();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,20 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.sftp;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@KafkaStreamAppTest
|
||||
class KafkaSftpSourceTests extends SftpSourceTests {
|
||||
public class KafkaSftpSourceTests extends SftpSourceTests {
|
||||
|
||||
@BeforeEach
|
||||
private void initKafka() {
|
||||
configureSource(KafkaConfig
|
||||
.prepackagedContainerFor("sftp-source", VERSION));
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("sftp-source", VERSION);
|
||||
|
||||
}
|
||||
|
||||
@@ -16,20 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source.sftp;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@RabbitMQStreamAppTest
|
||||
class RabbitMQSftpSourceTests extends SftpSourceTests {
|
||||
public class RabbitMQSftpSourceTests extends SftpSourceTests {
|
||||
|
||||
@BeforeEach
|
||||
private void initKafka() {
|
||||
configureSource(RabbitMQConfig
|
||||
.prepackagedContainerFor("sftp-source", VERSION));
|
||||
}
|
||||
@BaseContainer
|
||||
public static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("sftp-source", VERSION);
|
||||
|
||||
}
|
||||
|
||||
@@ -20,7 +20,9 @@ import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.BindMode;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
@@ -30,10 +32,12 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.BaseContainerExtension;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
|
||||
|
||||
@ExtendWith(BaseContainerExtension.class)
|
||||
abstract class SftpSourceTests {
|
||||
|
||||
private StreamAppContainer source;
|
||||
@@ -45,9 +49,11 @@ abstract class SftpSourceTests {
|
||||
.withClasspathResourceMapping("sftp", "/home/user/remote", BindMode.READ_ONLY)
|
||||
.withStartupTimeout(DEFAULT_DURATION);
|
||||
|
||||
protected void configureSource(StreamAppContainer baseContainer) {
|
||||
@BeforeEach
|
||||
void configureSource() {
|
||||
await().atMost(DEFAULT_DURATION).until(() -> sftp.isRunning());
|
||||
source = baseContainer.withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
|
||||
source = BaseContainerExtension.containerInstance()
|
||||
.withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
|
||||
.withEnv("SFTP_SUPPLIER_REMOTE_DIR", "/remote")
|
||||
.withEnv("SFTP_SUPPLIER_FACTORY_USERNAME", "user")
|
||||
.withEnv("SFTP_SUPPLIER_FACTORY_PASSWORD", "pass")
|
||||
@@ -70,6 +76,7 @@ abstract class SftpSourceTests {
|
||||
private void startContainer(Map<String, String> environment) {
|
||||
source.withEnv(environment);
|
||||
source.start();
|
||||
environment.keySet().forEach(k -> source.getEnvMap().remove(k));
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
||||
@@ -19,8 +19,9 @@ package org.springframework.cloud.stream.apps.integration.test.source.time;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@@ -28,8 +29,6 @@ import static org.springframework.cloud.stream.apps.integration.test.common.Conf
|
||||
class KafkaTimeSourceTests extends TimeSourceTests {
|
||||
|
||||
@Container
|
||||
static StreamAppContainer source = KafkaConfig
|
||||
.prepackagedContainerFor("time-source", VERSION)
|
||||
.withLogConsumer(logMatcher);
|
||||
static StreamAppContainer source = KafkaConfig.prepackagedContainerFor("time-source", VERSION);
|
||||
|
||||
}
|
||||
|
||||
@@ -19,17 +19,15 @@ package org.springframework.cloud.stream.apps.integration.test.source.time;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.RabbitMQStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
|
||||
|
||||
|
||||
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
|
||||
|
||||
@RabbitMQStreamAppTest
|
||||
class RabbitMQTimeSourceTests extends TimeSourceTests {
|
||||
|
||||
@Container
|
||||
static StreamAppContainer source = RabbitMQConfig
|
||||
.prepackagedContainerFor("time-source", VERSION)
|
||||
.withLogConsumer(logMatcher)
|
||||
.withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
|
||||
.withEnv("LOGGING_LEVEL_ORG_APACHE_GEODE", "DEBUG");
|
||||
static StreamAppContainer source = RabbitMQConfig.prepackagedContainerFor("time-source", VERSION);
|
||||
}
|
||||
|
||||
@@ -18,10 +18,10 @@ package org.springframework.cloud.stream.apps.integration.test.source.time;
|
||||
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
@@ -32,15 +32,17 @@ abstract class TimeSourceTests {
|
||||
// "MM/dd/yy HH:mm:ss";
|
||||
private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
|
||||
|
||||
static LogMatcher logMatcher = LogMatcher.contains("Started TimeSource");
|
||||
|
||||
@Autowired
|
||||
private OutputMatcher outputMatcher;
|
||||
|
||||
@Test
|
||||
void test() {
|
||||
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
|
||||
await().atMost(DEFAULT_DURATION)
|
||||
.until(outputMatcher.payloadMatches((String s) -> pattern.matcher(s).matches()));
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void cleanUp() {
|
||||
outputMatcher.clearMessageMatchers();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,8 +24,8 @@ import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamApps;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
|
||||
|
||||
@@ -21,8 +21,8 @@ import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
|
||||
import org.springframework.cloud.stream.app.test.integration.StreamApps;
|
||||
import org.springframework.cloud.stream.app.test.integration.junit.jupiter.KafkaStreamAppTest;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
|
||||
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
|
||||
|
||||
Reference in New Issue
Block a user