Polishing
This commit is contained in:
@@ -20,7 +20,6 @@ import java.time.Duration;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
|
||||
@@ -38,8 +37,8 @@ public class TikTokTests extends KafkaStreamIntegrationTestSupport {
|
||||
|
||||
@Container
|
||||
static StreamApps streamApps = kafkaStreamApps("tikTok", kafka)
|
||||
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("time-source")))
|
||||
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink"))
|
||||
.withSourceContainer(defaultKafkaContainerFor("time-source"))
|
||||
.withSinkContainer(defaultKafkaContainerFor("log-sink")
|
||||
.withLogConsumer(logMatcher))
|
||||
.build();
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ import okhttp3.mockwebserver.MockWebServer;
|
||||
import okhttp3.mockwebserver.RecordedRequest;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@@ -57,12 +56,12 @@ public class HttpRequestProcessorTests extends KafkaStreamIntegrationTestSupport
|
||||
private static final StreamApps streamApps = kafkaStreamApps(
|
||||
HttpRequestProcessorTests.class.getSimpleName(), kafka)
|
||||
.withSourceContainer(httpSource(sourcePort))
|
||||
.withProcessorContainer(new GenericContainer(defaultKafkaImageFor("http-request-processor"))
|
||||
.withProcessorContainer(defaultKafkaContainerFor("http-request-processor")
|
||||
.withEnv("HTTP_REQUEST_URL_EXPRESSION",
|
||||
"'http://" + localHostAddress() + ":" + serverPort + "'")
|
||||
.withEnv("HTTP_REQUEST_HTTP_METHOD_EXPRESSION", "'POST'"))
|
||||
.withSinkContainer(
|
||||
new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher))
|
||||
defaultKafkaContainerFor("log-sink").withLogConsumer(logMatcher))
|
||||
.build();
|
||||
|
||||
@BeforeAll
|
||||
|
||||
@@ -22,7 +22,6 @@ import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.BindMode;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
@@ -59,11 +58,11 @@ public class JdbcSinkTests extends KafkaStreamIntegrationTestSupport {
|
||||
|
||||
@Container
|
||||
private static StreamApps streamApps = kafkaStreamApps(JdbcSinkTests.class.getSimpleName(), kafka)
|
||||
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("http-source"))
|
||||
.withSourceContainer(defaultKafkaContainerFor("http-source")
|
||||
.withEnv("SERVER_PORT", String.valueOf(serverPort))
|
||||
.withExposedPorts(serverPort)
|
||||
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))))
|
||||
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("jdbc-sink"))
|
||||
.withSinkContainer(defaultKafkaContainerFor("jdbc-sink")
|
||||
.withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
|
||||
.withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
|
||||
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
|
||||
|
||||
@@ -22,7 +22,6 @@ import java.util.List;
|
||||
import org.bson.Document;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.MongoDBContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
@@ -60,7 +59,7 @@ public class MongoDBSinkTests extends KafkaStreamIntegrationTestSupport {
|
||||
@Container
|
||||
private StreamApps streamApps = kafkaStreamApps(MongoDBSinkTests.class.getSimpleName(), kafka)
|
||||
.withSourceContainer(httpSource(serverPort))
|
||||
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("mongodb-sink"))
|
||||
.withSinkContainer(defaultKafkaContainerFor("mongodb-sink")
|
||||
.withEnv("MONGO_DB_CONSUMER_COLLECTION", "test")
|
||||
.withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString()))
|
||||
.build();
|
||||
|
||||
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@@ -56,7 +55,7 @@ public class TcpSinkTests extends KafkaStreamIntegrationTestSupport {
|
||||
@Container
|
||||
private static StreamApps streamApps = kafkaStreamApps(TcpSinkTests.class.getSimpleName(), kafka)
|
||||
.withSourceContainer(httpSource(port))
|
||||
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("tcp-sink"))
|
||||
.withSinkContainer(defaultKafkaContainerFor("tcp-sink")
|
||||
.withEnv("TCP_CONSUMER_HOST", localHostAddress())
|
||||
.withEnv("TCP_PORT", String.valueOf(tcpPort))
|
||||
.withEnv("TCP_CONSUMER_ENCODER", "CRLF"))
|
||||
|
||||
@@ -31,7 +31,6 @@ import org.apache.geode.cache.client.ClientRegionShortcut;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.images.builder.ImageFromDockerfile;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
@@ -73,6 +72,17 @@ public class GeodeSourceTests extends KafkaStreamIntegrationTestSupport {
|
||||
.withCommand("tail", "-f", "/dev/null")
|
||||
.withStartupTimeout(Duration.ofMinutes(2));
|
||||
|
||||
@Container
|
||||
private final StreamApps streamApps = kafkaStreamApps(this.getClass().getSimpleName(), kafka)
|
||||
.withSourceContainer(defaultKafkaContainerFor("geode-source")
|
||||
.withEnv("GEODE_POOL_CONNECT_TYPE", "server")
|
||||
.withEnv("GEODE_REGION_REGION_NAME", "myRegion")
|
||||
.withEnv("GEODE_POOL_HOST_ADDRESSES", localHostAddress() + ":" + cacheServerPort)
|
||||
.withLogConsumer(geodeLogMatcher))
|
||||
.withSinkContainer(defaultKafkaContainerFor("log-sink")
|
||||
.withLogConsumer(logMatcher))
|
||||
.build();
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
// Not using locator is faster.
|
||||
@@ -90,17 +100,6 @@ public class GeodeSourceTests extends KafkaStreamIntegrationTestSupport {
|
||||
.create("myRegion");
|
||||
}
|
||||
|
||||
@Container
|
||||
private final StreamApps streamApps = kafkaStreamApps(this.getClass().getSimpleName(), kafka)
|
||||
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("geode-source"))
|
||||
.withEnv("GEODE_POOL_CONNECT_TYPE", "server")
|
||||
.withEnv("GEODE_REGION_REGION_NAME", "myRegion")
|
||||
.withEnv("GEODE_POOL_HOST_ADDRESSES", localHostAddress() + ":" + cacheServerPort)
|
||||
.withLogConsumer(geodeLogMatcher))
|
||||
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink"))
|
||||
.withLogConsumer(logMatcher))
|
||||
.build();
|
||||
|
||||
@Test
|
||||
void test() {
|
||||
await().atMost(Duration.ofMinutes(2))
|
||||
|
||||
@@ -19,7 +19,6 @@ package org.springframework.cloud.stream.apps.integration.test.source;
|
||||
import java.time.Duration;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@@ -45,7 +44,7 @@ public class HttpSourceTests extends KafkaStreamIntegrationTestSupport {
|
||||
@Container
|
||||
private static final StreamApps streamApps = kafkaStreamApps(HttpSourceTests.class.getSimpleName(), kafka)
|
||||
.withSourceContainer(httpSource(serverPort))
|
||||
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher))
|
||||
.withSinkContainer(defaultKafkaContainerFor("log-sink").withLogConsumer(logMatcher))
|
||||
.build();
|
||||
|
||||
@Test
|
||||
|
||||
@@ -20,7 +20,6 @@ import java.time.Duration;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.BindMode;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
@@ -47,7 +46,7 @@ public class JdbcSourceTests extends KafkaStreamIntegrationTestSupport {
|
||||
|
||||
@Container
|
||||
private static final StreamApps streamApps = kafkaStreamApps(JdbcSourceTests.class.getSimpleName(), kafka)
|
||||
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("jdbc-source"))
|
||||
.withSourceContainer(defaultKafkaContainerFor("jdbc-source")
|
||||
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
|
||||
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
|
||||
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
|
||||
@@ -55,7 +54,7 @@ public class JdbcSourceTests extends KafkaStreamIntegrationTestSupport {
|
||||
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
|
||||
.withEnv("SPRING_DATASOURCE_URL",
|
||||
"jdbc:mysql://" + mySQL.getNetworkAliases().get(0) + ":3306/test"))
|
||||
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink"))
|
||||
.withSinkContainer(defaultKafkaContainerFor("log-sink")
|
||||
.withLogConsumer(logMatcher))
|
||||
.build();
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ public class S3SourceTests extends KafkaStreamIntegrationTestSupport {
|
||||
}
|
||||
|
||||
private StreamApps streamApps = kafkaStreamApps(S3SourceTests.class.getSimpleName(), kafka)
|
||||
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("s3-source"))
|
||||
.withSourceContainer(defaultKafkaContainerFor("s3-source")
|
||||
.withEnv("S3_SUPPLIER_REMOTE_DIR", "bucket")
|
||||
.withEnv("S3_COMMON_ENDPOINT_URL", "http://" + localHostAddress() + ":" + minio.getMappedPort(9000))
|
||||
.withEnv("S3_COMMON_PATH_STYLE_ACCESS", "true")
|
||||
@@ -92,7 +92,7 @@ public class S3SourceTests extends KafkaStreamIntegrationTestSupport {
|
||||
.withEnv("CLOUD_AWS_CREDENTIALS_SECRET_KEY", "minio123")
|
||||
.withEnv("CLOUD_AWS_REGION_STATIC", "us-east-1")
|
||||
.withLogConsumer(logMatcher))
|
||||
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher))
|
||||
.withSinkContainer(defaultKafkaContainerFor("log-sink").withLogConsumer(logMatcher))
|
||||
.build();
|
||||
|
||||
@Test
|
||||
|
||||
@@ -46,14 +46,14 @@ public class SftpSourceTests extends KafkaStreamIntegrationTestSupport {
|
||||
.withStartupTimeout(Duration.ofMinutes(1));
|
||||
|
||||
private StreamApps streamApps = kafkaStreamApps(SftpSourceTests.class.getSimpleName(), kafka)
|
||||
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("sftp-source"))
|
||||
.withSourceContainer(defaultKafkaContainerFor("sftp-source")
|
||||
.withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
|
||||
.withEnv("SFTP_SUPPLIER_REMOTE_DIR", "/remote")
|
||||
.withEnv("SFTP_SUPPLIER_FACTORY_USERNAME", "user")
|
||||
.withEnv("SFTP_SUPPLIER_FACTORY_PASSWORD", "pass")
|
||||
.withEnv("SFTP_SUPPLIER_FACTORY_PORT", String.valueOf(sftp.getMappedPort(22)))
|
||||
.withEnv("SFTP_SUPPLIER_FACTORY_HOST", localHostAddress()))
|
||||
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher))
|
||||
.withSinkContainer(defaultKafkaContainerFor("log-sink").withLogConsumer(logMatcher))
|
||||
.build();
|
||||
|
||||
// TODO: This fixture supports additional tests with different modes, etc.
|
||||
|
||||
@@ -30,14 +30,18 @@ public abstract class KafkaStreamIntegrationTestSupport extends AbstractKafkaStr
|
||||
|
||||
protected static final String DOCKER_ORG = "springcloudstream";
|
||||
|
||||
protected static DockerImageName defaultKafkaImageFor(String appName) {
|
||||
private static DockerImageName defaultKafkaImageFor(String appName) {
|
||||
return DockerImageName.parse(DOCKER_ORG + "/" + appName + "-kafka:" + VERSION);
|
||||
}
|
||||
|
||||
protected static GenericContainer defaultKafkaContainerFor(String appName) {
|
||||
return new GenericContainer(defaultKafkaImageFor(appName));
|
||||
}
|
||||
|
||||
protected static GenericContainer httpSource(int serverPort) {
|
||||
return new GenericContainer(defaultKafkaImageFor("http-source"))
|
||||
.withEnv("SERVER_PORT", String.valueOf(serverPort))
|
||||
.withExposedPorts(serverPort)
|
||||
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
|
||||
return new GenericContainer(defaultKafkaImageFor("http-source"))
|
||||
.withEnv("SERVER_PORT", String.valueOf(serverPort))
|
||||
.withExposedPorts(serverPort)
|
||||
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user