From fc5e51342cdbf9ba3b51484b88c52246d8eb4cee Mon Sep 17 00:00:00 2001 From: David Turanski Date: Sat, 17 Oct 2020 13:09:13 -0400 Subject: [PATCH] Refactor to remove docker-compose --- stream-applications-integration-tests/pom.xml | 56 +++++-- .../integration/test/TikTokBaselineTests.java | 66 ++++++++ .../{TickTockTests.java => TikTokTests.java} | 29 ++-- .../processor/HttpRequestProcessorTests.java | 47 +++--- .../integration/test/sink/JdbcSinkTests.java | 84 +++++----- .../test/sink/MongoDBSinkTests.java | 43 ++--- .../integration/test/sink/TcpSinkTests.java | 33 ++-- .../test/source/GeodeSourceTests.java | 30 ++-- .../test/source/HttpSourceTests.java | 37 ++--- .../test/source/JdbcSourceTests.java | 42 +++-- .../test/source/S3SourceTests.java | 110 +++++++------ .../test/source/SftpSourceTests.java | 50 +++--- .../AbstractStreamApplicationTests.java | 149 ------------------ .../integration/test/support/FluentMap.java | 30 ---- .../KafkaStreamIntegrationTestSupport.java | 43 +++++ .../integration/test/support/LogMatcher.java | 88 ----------- .../test/support/TemplateProcessor.java | 118 -------------- .../test/resources/compose-kafka-external.yml | 23 --- .../src/test/resources/logback-test.xml | 2 +- .../http-request-processor-tests.yml | 27 ---- .../test/resources/sink/jdbc-sink-tests.yml | 24 --- .../resources/sink/mongodb-sink-tests.yml | 19 --- .../test/resources/sink/tcp-sink-tests.yml | 21 --- .../resources/source/geode-source-tests.yml | 20 --- .../resources/source/http-source-tests.yml | 18 --- .../resources/source/jdbc-source-tests.yml | 33 ---- .../test/resources/source/s3-source-tests.yml | 28 ---- .../resources/source/sftp-source-tests.yml | 27 ---- .../src/test/resources/test.properties | 2 - .../src/test/resources/tick-tock-tests.yml | 14 -- 30 files changed, 423 insertions(+), 890 deletions(-) create mode 100644 stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TikTokBaselineTests.java rename stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/{TickTockTests.java => TikTokTests.java} (52%) delete mode 100644 stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/AbstractStreamApplicationTests.java delete mode 100644 stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/FluentMap.java create mode 100644 stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/KafkaStreamIntegrationTestSupport.java delete mode 100644 stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/LogMatcher.java delete mode 100644 stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/TemplateProcessor.java delete mode 100644 stream-applications-integration-tests/src/test/resources/compose-kafka-external.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/processor/http-request-processor-tests.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/sink/jdbc-sink-tests.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/sink/mongodb-sink-tests.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/sink/tcp-sink-tests.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/source/geode-source-tests.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/source/http-source-tests.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/source/jdbc-source-tests.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/source/s3-source-tests.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/source/sftp-source-tests.yml delete mode 100644 stream-applications-integration-tests/src/test/resources/test.properties delete mode 100644 stream-applications-integration-tests/src/test/resources/tick-tock-tests.yml diff --git a/stream-applications-integration-tests/pom.xml b/stream-applications-integration-tests/pom.xml index b31e48f..112a9ed 100644 --- a/stream-applications-integration-tests/pom.xml +++ b/stream-applications-integration-tests/pom.xml @@ -17,9 +17,10 @@ 1.8 1.0.0-SNAPSHOT + 3.0.0-SNAPSHOT 2.6.2 Hoxton.SR8 - 1.14.3 + 1.15.0-rc2 1.15 3.1.0 false @@ -48,6 +49,7 @@ org.springframework.boot spring-boot-starter-webflux + test org.springframework.boot @@ -60,11 +62,29 @@ + + org.testcontainers + testcontainers + test + + org.testcontainers junit-jupiter test + + + org.testcontainers + kafka + test + + + + org.awaitility + awaitility + test + org.testcontainers mariadb @@ -75,6 +95,23 @@ mongodb test + + org.testcontainers + mysql + test + + + mysql + mysql-connector-java + 5.1.49 + test + + + org.springframework.cloud.stream.app + stream-applications-test-support + ${stream-applications.version} + test + org.springframework.cloud.fn function-test-support @@ -100,6 +137,7 @@ org.springframework.boot spring-boot-starter-data-mongodb + test org.mariadb.jdbc @@ -113,22 +151,10 @@ test - com.samskivert - jmustache - ${jmustache.version} + org.springframework.boot + spring-boot-starter-webflux test - - org.awaitility - awaitility - test - - - junit - junit - - - diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TikTokBaselineTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TikTokBaselineTests.java new file mode 100644 index 0000000..a7f1136 --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TikTokBaselineTests.java @@ -0,0 +1,66 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test; + +import java.time.Duration; +import java.util.regex.Pattern; + +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; + +import org.springframework.cloud.stream.app.test.integration.LogMatcher; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; + +import static org.awaitility.Awaitility.await; +import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog; +import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap; + +public class TikTokBaselineTests extends KafkaStreamIntegrationTestSupport { + // "MM/dd/yy HH:mm:ss"; + private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}"); + + private final static LogMatcher logMatcher = new LogMatcher(); + + @Container + static GenericContainer timeSource = new GenericContainer( + DockerImageName.parse("springcloudstream/time-source-kafka:3.0.0-SNAPSHOT")) + .withNetwork(kafka.getNetwork()) + .withEnv(fluentMap().withEntry("SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION", "TikTok") + .withEntry("SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS", + kafka.getNetworkAliases().get(0) + ":9092")) + .dependsOn(kafka); + + @Container + static GenericContainer logSink = new GenericContainer( + DockerImageName.parse("springcloudstream/log-sink-kafka:3.0.0-SNAPSHOT")) + .withNetwork(kafka.getNetwork()) + .withEnv(fluentMap().withEntry("SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION", "TikTok") + .withEntry("SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS", + kafka.getNetworkAliases().get(0) + ":9092") + .withEntry("SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP", "TikTok")) + .withLogConsumer(logMatcher) + .withLogConsumer(appLog("log-sink")) + .dependsOn(kafka); + + @Test + void tiktok() { + await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started LogSink"))); + await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.matchesRegex(pattern.pattern()))); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TickTockTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TikTokTests.java similarity index 52% rename from stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TickTockTests.java rename to stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TikTokTests.java index bdb5992..775d321 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TickTockTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TikTokTests.java @@ -20,30 +20,33 @@ import java.time.Duration; import java.util.regex.Pattern; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; -import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import static org.awaitility.Awaitility.await; -import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class TickTockTests extends AbstractStreamApplicationTests { +public class TikTokTests extends KafkaStreamIntegrationTestSupport { // "MM/dd/yy HH:mm:ss"; - private final Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}"); + private final static Pattern DATE_PATTERN = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}"); - private final LogMatcher logMatcher = new LogMatcher(); + private final static LogMatcher logMatcher = new LogMatcher(); @Container - private final DockerComposeContainer environment = new DockerComposeContainer( - templateProcessor("tick-tock-tests.yml").processTemplate()) - .withLogConsumer("log-sink", logMatcher) - .withLogConsumer("log-sink", appLog("log-sink")); + static StreamApps streamApps = kafkaStreamApps("tikTok", kafka) + .withSourceContainer(new GenericContainer(defaultKafkaImageFor("time-source"))) + .withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")) + .withLogConsumer(logMatcher)) + .build(); @Test - void ticktock() { + void tiktok() { await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started LogSink"))); - await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.matchesRegex(pattern.pattern()))); + await().atMost(Duration.ofSeconds(30)) + .until(logMatcher.verifies(log -> log.matchesRegex(DATE_PATTERN.pattern()))); } } diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/HttpRequestProcessorTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/HttpRequestProcessorTests.java index 780e93e..7dc7c67 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/HttpRequestProcessorTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/HttpRequestProcessorTests.java @@ -25,43 +25,45 @@ import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; -import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; import reactor.core.publisher.Mono; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; -import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog; -import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class HttpRequestProcessorTests extends AbstractStreamApplicationTests { +public class HttpRequestProcessorTests extends KafkaStreamIntegrationTestSupport { private static MockWebServer server = new MockWebServer(); - private static int serverPort = findAvailablePort(); - - private static String url = "http://" + localHostAddress() + ":" + serverPort; - - private static int sourcePort = findAvailablePort(); + private static WebClient webClient = WebClient.builder().build(); private static LogMatcher logMatcher = new LogMatcher(); + private static int serverPort = findAvailablePort(); + + private static int sourcePort = findAvailablePort(); + @Container - private static final DockerComposeContainer environment = new DockerComposeContainer( - templateProcessor("processor/http-request-processor-tests.yml", fluentMap() - .withEntry("port", sourcePort) - .withEntry("url", url)).processTemplate()) - .withLogConsumer("log-sink", appLog("log-sink")) - .withLogConsumer("log-sink", logMatcher) - .withExposedService("http-source", sourcePort, - Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + private static final StreamApps streamApps = kafkaStreamApps( + HttpRequestProcessorTests.class.getSimpleName(), kafka) + .withSourceContainer(httpSource(sourcePort)) + .withProcessorContainer(new GenericContainer(defaultKafkaImageFor("http-request-processor")) + .withEnv("HTTP_REQUEST_URL_EXPRESSION", + "'http://" + localHostAddress() + ":" + serverPort + "'") + .withEnv("HTTP_REQUEST_HTTP_METHOD_EXPRESSION", "'POST'")) + .withSinkContainer( + new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher)) + .build(); @BeforeAll static void startServer() throws Exception { @@ -81,9 +83,9 @@ public class HttpRequestProcessorTests extends AbstractStreamApplicationTests { await().atMost(Duration.ofSeconds(30)) .until(logMatcher.verifies(log -> log.when(() -> { - ClientResponse response = webClient() + ClientResponse response = webClient .post() - .uri("http://localhost:" + sourcePort) + .uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(sourcePort)) .contentType(MediaType.TEXT_PLAIN) .body(Mono.just("ping"), String.class) .exchange() @@ -91,4 +93,5 @@ public class HttpRequestProcessorTests extends AbstractStreamApplicationTests { assertThat(response.statusCode().is2xxSuccessful()).isTrue(); }).matchesRegex(".*\\{\"response\":\"ping\"\\}"))); } + } diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/JdbcSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/JdbcSinkTests.java index 836b2be..371384a 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/JdbcSinkTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/JdbcSinkTests.java @@ -21,67 +21,75 @@ import java.time.Duration; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; -import org.testcontainers.containers.MariaDBContainer; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; import reactor.core.publisher.Mono; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import org.springframework.http.MediaType; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog; -import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class JdbcSinkTests extends AbstractStreamApplicationTests { +public class JdbcSinkTests extends KafkaStreamIntegrationTestSupport { - private static int port = findAvailablePort(); + private static int serverPort = findAvailablePort(); private static JdbcTemplate jdbcTemplate; - @Container - private static MariaDBContainer mariadbContainer = (MariaDBContainer) new MariaDBContainer() - .withDatabaseName("test") - .withPassword("password") - .withUsername("user") - .withInitScript("init.sql") - .withExposedPorts(3306) - .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + private static WebClient webClient = WebClient.builder().build(); @Container - private DockerComposeContainer environment = new DockerComposeContainer( - templateProcessor("sink/jdbc-sink-tests.yml", fluentMap() - .withEntry("jdbc.url", - mariadbContainer.getJdbcUrl().replace("localhost", - localHostAddress())) - .withEntry("user", mariadbContainer.getUsername()) - .withEntry("password", mariadbContainer.getPassword()) - .withEntry("port", port)).processTemplate()) - .withLogConsumer("jdbc-sink", appLog("jdbc-sink")) - .withExposedService("http-source", port, - Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + private static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7")) + .withUsername("test") + .withPassword("secret") + .withExposedPorts(3306) + .withNetwork(kafka.getNetwork()) + .withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY) + .withCommand("--init-file", "/init.sql"); + + @Container + private static StreamApps streamApps = kafkaStreamApps(JdbcSinkTests.class.getSimpleName(), kafka) + .withSourceContainer(new GenericContainer(defaultKafkaImageFor("http-source")) + .withEnv("SERVER_PORT", String.valueOf(serverPort)) + .withExposedPorts(serverPort) + .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)))) + .withSinkContainer(new GenericContainer(defaultKafkaImageFor("jdbc-sink")) + .withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street") + .withEnv("JDBC_CONSUMER_TABLE_NAME", "People") + .withEnv("SPRING_DATASOURCE_USERNAME", "test") + .withEnv("SPRING_DATASOURCE_PASSWORD", "secret") + .withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver") + .withEnv("SPRING_DATASOURCE_URL", + "jdbc:mysql://" + mySQL.getNetworkAliases().get(0) + ":3306/test")) + .build(); @BeforeAll - static void buildJdbcTemplate() { + static void startStreamApps() { HikariDataSource dataSource = new HikariDataSource(); - dataSource.setDriverClassName(mariadbContainer.getDriverClassName()); - dataSource.setUsername(mariadbContainer.getUsername()); - dataSource.setPassword(mariadbContainer.getPassword()); - dataSource.setJdbcUrl(mariadbContainer.getJdbcUrl()); + dataSource.setDriverClassName("org.mariadb.jdbc.Driver"); + dataSource.setUsername(mySQL.getUsername()); + dataSource.setPassword(mySQL.getPassword()); + dataSource.setJdbcUrl("jdbc:mysql://localhost:" + mySQL.getMappedPort(3306) + "/test"); jdbcTemplate = new JdbcTemplate(dataSource); jdbcTemplate.execute("DELETE FROM People"); } @Test void postData() { - String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\": \"Narrow Alley\"}}"; - ClientResponse response = webClient() + String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\":\"Narrow Alley\"}}"; + ClientResponse response = webClient .post() - .uri("http://localhost:" + port) + .uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort)) .contentType(MediaType.APPLICATION_JSON) .body(Mono.just(json), String.class) .exchange() @@ -90,8 +98,10 @@ public class JdbcSinkTests extends AbstractStreamApplicationTests { await().atMost(Duration.ofSeconds(30)) .untilAsserted( - () -> assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)) - .isOne()); - assertThat(jdbcTemplate.queryForObject("SELECT name from People", String.class)).isEqualTo("My Name"); + () -> assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) from People", + Integer.class)) + .isOne()); + assertThat(jdbcTemplate.queryForObject("SELECT name from People", + String.class)).isEqualTo("My Name"); } } diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/MongoDBSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/MongoDBSinkTests.java index d60972a..eb13659 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/MongoDBSinkTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/MongoDBSinkTests.java @@ -22,31 +22,34 @@ import java.util.List; import org.bson.Document; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.MongoDBContainer; -import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; import reactor.core.publisher.Mono; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import org.springframework.data.mongodb.MongoDatabaseFactory; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; import static org.assertj.core.api.Assertions.assertThat; -import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog; -import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class MongoDBSinkTests extends AbstractStreamApplicationTests { +public class MongoDBSinkTests extends KafkaStreamIntegrationTestSupport { - private static int port = findAvailablePort(); + private static int serverPort = findAvailablePort(); private static MongoTemplate mongoTemplate; + private static WebClient webClient = WebClient.builder().build(); + @Container - private static MongoDBContainer mongoDBContainer = new MongoDBContainer() + private static MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10")) .withExposedPorts(27017) .withStartupTimeout(Duration.ofMinutes(2)); @@ -54,29 +57,27 @@ public class MongoDBSinkTests extends AbstractStreamApplicationTests { return String.format("mongodb://%s:%s/%s", localHostAddress(), mongoDBContainer.getMappedPort(27017), "test"); } + @Container + private StreamApps streamApps = kafkaStreamApps(MongoDBSinkTests.class.getSimpleName(), kafka) + .withSourceContainer(httpSource(serverPort)) + .withSinkContainer(new GenericContainer(defaultKafkaImageFor("mongodb-sink")) + .withEnv("MONGO_DB_CONSUMER_COLLECTION", "test") + .withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString())) + .build(); + @BeforeAll - private static void buildMongoTemplate() { + static void buildMongoTemplate() { MongoDatabaseFactory mongoDatabaseFactory = new SimpleMongoClientDatabaseFactory( mongoConnectionString()); mongoTemplate = new MongoTemplate(mongoDatabaseFactory); } - @Container - private DockerComposeContainer environment = new DockerComposeContainer( - templateProcessor("sink/mongodb-sink-tests.yml", fluentMap() - .withEntry("mongodb.url", mongoConnectionString()) - .withEntry("port", port)).processTemplate()) - .withLogConsumer("jdbc-sink", - appLog("jdbc-sink")) - .withExposedService("http-source", port, - Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); - @Test void postData() { String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\": \"Narrow Alley\"}}"; - ClientResponse response = webClient() + ClientResponse response = webClient .post() - .uri("http://localhost:" + port) + .uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort)) .contentType(MediaType.APPLICATION_JSON) .body(Mono.just(json), String.class) .exchange() diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/TcpSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/TcpSinkTests.java index be8ec55..a0f8758 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/TcpSinkTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/TcpSinkTests.java @@ -27,21 +27,21 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; -import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; import reactor.core.publisher.Mono; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog; -import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class TcpSinkTests extends AbstractStreamApplicationTests { +public class TcpSinkTests extends KafkaStreamIntegrationTestSupport { private static final int port = findAvailablePort(); @@ -51,15 +51,16 @@ public class TcpSinkTests extends AbstractStreamApplicationTests { private static final AtomicBoolean socketReady = new AtomicBoolean(); + private static WebClient webClient = WebClient.builder().build(); + @Container - private static final DockerComposeContainer environment = new DockerComposeContainer( - templateProcessor("sink/tcp-sink-tests.yml", fluentMap() - .withEntry("port", port) - .withEntry("tcp.port", tcpPort) - .withEntry("tcp.host", localHostAddress())).processTemplate()) - .withLogConsumer("tcp-sink", appLog("tcp-sink")) - .withExposedService("http-source", port, - Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + private static StreamApps streamApps = kafkaStreamApps(TcpSinkTests.class.getSimpleName(), kafka) + .withSourceContainer(httpSource(port)) + .withSinkContainer(new GenericContainer(defaultKafkaImageFor("tcp-sink")) + .withEnv("TCP_CONSUMER_HOST", localHostAddress()) + .withEnv("TCP_PORT", String.valueOf(tcpPort)) + .withEnv("TCP_CONSUMER_ENCODER", "CRLF")) + .build(); @BeforeAll static void startTcpServer() { @@ -77,9 +78,9 @@ public class TcpSinkTests extends AbstractStreamApplicationTests { @Test void postData() throws IOException { String text = "Hello, world!"; - ClientResponse response = webClient() + ClientResponse response = webClient .post() - .uri("http://localhost:" + port) + .uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(port)) .contentType(MediaType.TEXT_PLAIN) .body(Mono.just(text), String.class) .exchange() diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/GeodeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/GeodeSourceTests.java index 01dd455..d00a391 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/GeodeSourceTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/GeodeSourceTests.java @@ -31,19 +31,19 @@ import org.apache.geode.cache.client.ClientRegionShortcut; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.images.builder.ImageFromDockerfile; import org.testcontainers.junit.jupiter.Container; import org.springframework.cloud.fn.test.support.geode.GeodeContainer; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; -import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import static org.awaitility.Awaitility.await; -import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog; -import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class GeodeSourceTests extends AbstractStreamApplicationTests { +public class GeodeSourceTests extends KafkaStreamIntegrationTestSupport { private static final LogMatcher logMatcher = new LogMatcher(); @@ -91,15 +91,15 @@ public class GeodeSourceTests extends AbstractStreamApplicationTests { } @Container - private final DockerComposeContainer environment = new DockerComposeContainer( - templateProcessor("source/geode-source-tests.yml", fluentMap() - .withEntry("geode.host-addresses", "geode:" + cacheServerPort) - .withEntry("geodeHost", localHostAddress()) - .withEntry("geode.region", "myRegion")).processTemplate()) - .withLogConsumer("log-sink", appLog("log-sink")) - .withLogConsumer("geode-source", geodeLogMatcher) - .withLogConsumer("log-sink", logMatcher) - .withLocalCompose(true); + private final StreamApps streamApps = kafkaStreamApps(this.getClass().getSimpleName(), kafka) + .withSourceContainer(new GenericContainer(defaultKafkaImageFor("geode-source")) + .withEnv("GEODE_POOL_CONNECT_TYPE", "server") + .withEnv("GEODE_REGION_REGION_NAME", "myRegion") + .withEnv("GEODE_POOL_HOST_ADDRESSES", localHostAddress() + ":" + cacheServerPort) + .withLogConsumer(geodeLogMatcher)) + .withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")) + .withLogConsumer(logMatcher)) + .build(); @Test void test() { diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/HttpSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/HttpSourceTests.java index 992ad54..ee94ef9 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/HttpSourceTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/HttpSourceTests.java @@ -17,44 +17,44 @@ package org.springframework.cloud.stream.apps.integration.test.source; import java.time.Duration; -import java.util.Collections; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; -import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; import reactor.core.publisher.Mono; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; -import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class HttpSourceTests extends AbstractStreamApplicationTests { +public class HttpSourceTests extends KafkaStreamIntegrationTestSupport { - private static int port = findAvailablePort(); + private static int serverPort = findAvailablePort(); + + private static WebClient webClient = WebClient.builder().build(); private static LogMatcher logMatcher = new LogMatcher(); @Container - private static final DockerComposeContainer environment = new DockerComposeContainer( - templateProcessor("source/http-source-tests.yml", Collections.singletonMap("port", port)).processTemplate()) - .withLogConsumer("log-sink", appLog("log-sink")) - .withLogConsumer("log-sink", logMatcher) - .withExposedService("http-source", port, - Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + private static final StreamApps streamApps = kafkaStreamApps(HttpSourceTests.class.getSimpleName(), kafka) + .withSourceContainer(httpSource(serverPort)) + .withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher)) + .build(); @Test void plaintext() { await().atMost(Duration.ofSeconds(30)) .until(logMatcher.verifies(log -> log.when(() -> { - ClientResponse response = webClient() + ClientResponse response = webClient .post() - .uri("http://localhost:" + port) + .uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort)) .contentType(MediaType.TEXT_PLAIN) .body(Mono.just("Hello"), String.class) .exchange() @@ -67,9 +67,9 @@ public class HttpSourceTests extends AbstractStreamApplicationTests { void json() { await().atMost(Duration.ofSeconds(30)) .until(logMatcher.verifies(log -> log.when(() -> { - ClientResponse response = webClient() + ClientResponse response = webClient .post() - .uri("http://localhost:" + port) + .uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort)) .contentType(MediaType.APPLICATION_JSON) .body(Mono.just("{\"Hello\":\"world\"}"), String.class) .exchange() @@ -77,4 +77,5 @@ public class HttpSourceTests extends AbstractStreamApplicationTests { assertThat(response.statusCode().is2xxSuccessful()).isTrue(); }).matchesRegex(".*\\{\"Hello\":\"world\"\\}"))); } + } diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/JdbcSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/JdbcSourceTests.java index 71ef218..eb31bfd 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/JdbcSourceTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/JdbcSourceTests.java @@ -17,29 +17,47 @@ package org.springframework.cloud.stream.apps.integration.test.source; import java.time.Duration; -import java.util.Collections; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; -import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; -import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import static org.awaitility.Awaitility.await; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class JdbcSourceTests extends AbstractStreamApplicationTests { +public class JdbcSourceTests extends KafkaStreamIntegrationTestSupport { private static LogMatcher logMatcher = new LogMatcher(); @Container - private static final DockerComposeContainer environment = new DockerComposeContainer( - templateProcessor("source/jdbc-source-tests.yml", - Collections.singletonMap("init.sql", resourceAsFile("init.sql"))).processTemplate()) - .withLogConsumer("log-sink", logMatcher) - .waitingFor("jdbc-source", Wait.forLogMessage(".*Started JdbcSource.*", 1) - .withStartupTimeout(Duration.ofMinutes(2))); + public static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7")) + .withUsername("test") + .withPassword("secret") + .withExposedPorts(3306) + .withNetwork(kafka.getNetwork()) + .withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY) + .withCommand("--init-file", "/init.sql"); + + @Container + private static final StreamApps streamApps = kafkaStreamApps(JdbcSourceTests.class.getSimpleName(), kafka) + .withSourceContainer(new GenericContainer(defaultKafkaImageFor("jdbc-source")) + .withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'") + .withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id") + .withEnv("SPRING_DATASOURCE_USERNAME", "test") + .withEnv("SPRING_DATASOURCE_PASSWORD", "secret") + .withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver") + .withEnv("SPRING_DATASOURCE_URL", + "jdbc:mysql://" + mySQL.getNetworkAliases().get(0) + ":3306/test")) + .withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")) + .withLogConsumer(logMatcher)) + .build(); @Test void test() { diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3SourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3SourceTests.java index 9f422d3..84fc2a9 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3SourceTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3SourceTests.java @@ -17,6 +17,7 @@ package org.springframework.cloud.stream.apps.integration.test.source; import java.time.Duration; +import java.util.Map; import java.util.function.Consumer; import com.amazonaws.ClientConfiguration; @@ -32,35 +33,38 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; -import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher; -import org.springframework.cloud.stream.apps.integration.test.support.TemplateProcessor; +import org.springframework.cloud.stream.app.test.integration.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import static org.awaitility.Awaitility.await; -import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog; -import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap; +import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog; +import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class S3SourceTests extends AbstractStreamApplicationTests { +public class S3SourceTests extends KafkaStreamIntegrationTestSupport { private static AmazonS3 s3Client; private static LogMatcher logMatcher = new LogMatcher(); @Container - private static final GenericContainer minio = new GenericContainer("minio/minio:RELEASE.2020-09-05T07-14-49Z") - .withExposedPorts(9000) - .withEnv("MINIO_ACCESS_KEY", "minio") - .withEnv("MINIO_SECRET_KEY", "minio123") - .waitingFor(Wait.forHttp("/minio/health/live")) - .withCreateContainerCmdModifier( - (Consumer) createContainerCmd -> createContainerCmd.withHostName("minio")) - .withLogConsumer(appLog("minio")) - .withCommand("minio", "server", "/data"); + private static final GenericContainer minio = new GenericContainer( + DockerImageName.parse("minio/minio:RELEASE.2020-09-05T07-14-49Z")) + .withExposedPorts(9000) + .withEnv("MINIO_ACCESS_KEY", "minio") + .withEnv("MINIO_SECRET_KEY", "minio123") + .waitingFor(Wait.forHttp("/minio/health/live")) + .withCreateContainerCmdModifier( + (Consumer) createContainerCmd -> createContainerCmd + .withHostName("minio")) + .withLogConsumer(appLog("minio")) + .withCommand("minio", "server", "/data"); @BeforeAll static void init() { @@ -78,44 +82,46 @@ public class S3SourceTests extends AbstractStreamApplicationTests { } - private DockerComposeContainer environment; + private StreamApps streamApps = kafkaStreamApps(S3SourceTests.class.getSimpleName(), kafka) + .withSourceContainer(new GenericContainer(defaultKafkaImageFor("s3-source")) + .withEnv("S3_SUPPLIER_REMOTE_DIR", "bucket") + .withEnv("S3_COMMON_ENDPOINT_URL", "http://" + localHostAddress() + ":" + minio.getMappedPort(9000)) + .withEnv("S3_COMMON_PATH_STYLE_ACCESS", "true") + .withEnv("CLOUD_AWS_STACK_AUTO", "false") + .withEnv("CLOUD_AWS_CREDENTIALS_ACCESS_KEY", "minio") + .withEnv("CLOUD_AWS_CREDENTIALS_SECRET_KEY", "minio123") + .withEnv("CLOUD_AWS_REGION_STATIC", "us-east-1") + .withLogConsumer(logMatcher)) + .withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher)) + .build(); @Test void testLines() { startContainer( - templateProcessor("source/s3-source-tests.yml", - fluentMap().withEntry("s3.local.dir", resourceAsFile("minio")) - .withEntry("s3.endpoint.url", - "http://minio:" + minio.getMappedPort(9000)) - .withEntry("functionDefinition", "s3Supplier") - .withEntry("consumerMode", "lines") - .withEntry("listOnly", false) - .withEntry("minioHost", localHostAddress()))); + fluentMap().withEntry("FILE_CONSUMER_MODE", "lines")); await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started S3Source"))); await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.when(() -> { s3Client.createBucket("bucket"); - s3Client.putObject(new PutObjectRequest("bucket", "test", resourceAsFile("minio/data"))); + s3Client.putObject(new PutObjectRequest("bucket", "test", + resourceAsFile("minio/data"))); }).contains("Bart Simpson"))); - } @Test void testTaskLaunchRequest() { - startContainer(templateProcessor("source/s3-source-tests.yml", - fluentMap().withEntry("s3.local.dir", resourceAsFile("minio")) - .withEntry("s3.endpoint.url", - "http://minio:" + minio.getMappedPort(9000)) - .withEntry("functionDefinition", "s3Supplier|taskLaunchRequestFunction") - .withEntry("consumerMode", "ref") - .withEntry("listOnly", false) - .withEntry("minioHost", localHostAddress()))); - environment.start(); + startContainer(fluentMap() + .withEntry("SPRING_CLOUD_FUNCTION_DEFINITION", "s3Supplier|taskLaunchRequestFunction") + .withEntry("TASK_LAUNCH_REQUEST_ARG_EXPRESSIONS", "filename=payload") + .withEntry("TASK_LAUNCH_REQUEST_TASK_NAME", "myTask") + .withEntry("FILE_CONSUMER_MODE", "ref")); + await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started S3Source"))); await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.when(() -> { s3Client.createBucket("bucket"); - s3Client.putObject(new PutObjectRequest("bucket", "test", resourceAsFile("minio/data"))); + s3Client.putObject(new PutObjectRequest("bucket", "test", + resourceAsFile("minio/data"))); }).endsWith( "\\{\"args\":\\[\"filename=/tmp/s3-supplier/test\"\\],\"deploymentProps\":\\{\\},\"name\":\"myTask\"\\}"))); } @@ -123,30 +129,22 @@ public class S3SourceTests extends AbstractStreamApplicationTests { @Test void testListOnly() { - startContainer(templateProcessor("source/s3-source-tests.yml", - fluentMap().withEntry("s3.local.dir", resourceAsFile("minio")) - .withEntry("s3.endpoint.url", - "http://minio:" + minio.getMappedPort(9000)) - .withEntry("functionDefinition", "s3Supplier") - .withEntry("consumerMode", "ref") - .withEntry("listOnly", true) - .withEntry("minioHost", localHostAddress()))); - environment.start(); + startContainer( + fluentMap() + .withEntry("FILE_CONSUMER_MODE", "ref") + .withEntry("S3_SUPPLIER_LIST_ONLY", "true")); + await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started S3Source"))); await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.when(() -> { s3Client.createBucket("bucket"); - s3Client.putObject(new PutObjectRequest("bucket", "test", resourceAsFile("minio/data"))); + s3Client.putObject(new PutObjectRequest("bucket", "test", + resourceAsFile("minio/data"))); }).contains("\"bucketName\":\"bucket\",\"key\":\"test\""))); } - - private void startContainer(TemplateProcessor templateProcessor) { - environment = new DockerComposeContainer( - templateProcessor.processTemplate()) - .withLogConsumer("log-sink", logMatcher) - .withLogConsumer("s3-source", logMatcher) - .withLogConsumer("log-sink", appLog("log-sink")); - environment.start(); + private void startContainer(Map environment) { + streamApps.sourceContainer().withEnv(environment); + streamApps.start(); } @AfterEach @@ -155,6 +153,6 @@ public class S3SourceTests extends AbstractStreamApplicationTests { s3Client.deleteObject("bucket", "test"); s3Client.deleteBucket("bucket"); } - environment.stop(); + streamApps.stop(); } } diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/SftpSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/SftpSourceTests.java index 0109976..210104e 100644 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/SftpSourceTests.java +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/SftpSourceTests.java @@ -17,58 +17,62 @@ package org.springframework.cloud.stream.apps.integration.test.source; import java.time.Duration; +import java.util.Map; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; -import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests; -import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher; -import org.springframework.cloud.stream.apps.integration.test.support.TemplateProcessor; +import org.springframework.cloud.stream.app.test.integration.LogMatcher; +import org.springframework.cloud.stream.app.test.integration.StreamApps; +import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport; import static org.awaitility.Awaitility.await; -import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog; -import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap; +import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap; +import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps; -public class SftpSourceTests extends AbstractStreamApplicationTests { +public class SftpSourceTests extends KafkaStreamIntegrationTestSupport { private static LogMatcher logMatcher = new LogMatcher(); @Container - private static final GenericContainer sftp = (GenericContainer) new GenericContainer("atmoz/sftp") + private static final GenericContainer sftp = new GenericContainer(DockerImageName.parse("atmoz/sftp")) .withExposedPorts(22) .withCommand("user:pass:::remote") .withClasspathResourceMapping("sftp", "/home/user/remote", BindMode.READ_ONLY) .withStartupTimeout(Duration.ofMinutes(1)); - private DockerComposeContainer environment; + private StreamApps streamApps = kafkaStreamApps(SftpSourceTests.class.getSimpleName(), kafka) + .withSourceContainer(new GenericContainer(defaultKafkaImageFor("sftp-source")) + .withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true") + .withEnv("SFTP_SUPPLIER_REMOTE_DIR", "/remote") + .withEnv("SFTP_SUPPLIER_FACTORY_USERNAME", "user") + .withEnv("SFTP_SUPPLIER_FACTORY_PASSWORD", "pass") + .withEnv("SFTP_SUPPLIER_FACTORY_PORT", String.valueOf(sftp.getMappedPort(22))) + .withEnv("SFTP_SUPPLIER_FACTORY_HOST", localHostAddress())) + .withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher)) + .build(); + // TODO: This fixture supports additional tests with different modes, etc. @Test void test() { - startContainer(templateProcessor("source/sftp-source-tests.yml", - fluentMap().withEntry("sftpPort", sftp.getMappedPort(22)) - .withEntry("functionDefinition", "sftpSupplier") - .withEntry("consumerMode", "ref") - .withEntry("listOnly", false) - .withEntry("sftpHost", localHostAddress()))); + startContainer( + fluentMap().withEntry("FILE_CONSUMER_MODE", "ref")); + await().atMost(Duration.ofSeconds(30)) .until(logMatcher.verifies(logListener -> logListener.endsWith("\"/tmp/sftp-supplier/data.txt\""))); } - private void startContainer(TemplateProcessor templateProcessor) { - environment = new DockerComposeContainer( - templateProcessor.processTemplate()) - .withLogConsumer("log-sink", logMatcher) - .withLogConsumer("sftp-source", logMatcher) - .withLogConsumer("log-sink", appLog("log-sink")); - environment.start(); + private void startContainer(Map environment) { + streamApps.sourceContainer().withEnv(environment); + streamApps.start(); } @AfterEach void stop() { - environment.stop(); + streamApps.stop(); } } diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/AbstractStreamApplicationTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/AbstractStreamApplicationTests.java deleted file mode 100644 index a6d0634..0000000 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/AbstractStreamApplicationTests.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.apps.integration.test.support; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.DockerComposeContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.junit.jupiter.Testcontainers; - -import org.springframework.core.io.ClassPathResource; -import org.springframework.util.SocketUtils; -import org.springframework.web.reactive.function.client.WebClient; - -import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap; - -@Testcontainers -public abstract class AbstractStreamApplicationTests { - - private static int kafkaBrokerPort; - - private static TemplateProcessor.Builder templateProcessor; - - static { - kafkaBrokerPort = findAvailablePort(); - templateProcessor = TemplateProcessor.withGlobalProperties(loadGlobalProperties("test.properties")) - .withOutputDirectory(initializeTempDir()); - startKafkaContainer(); - } - - protected static TemplateProcessor templateProcessor(String path) { - return templateProcessor.withTemplate(new ClassPathResource(path)).create(); - } - - protected static TemplateProcessor templateProcessor(String path, Map templateProperties) { - return templateProcessor.withTemplate(new ClassPathResource(path)).withTemplateProperties(templateProperties) - .create(); - } - - protected static String localHostAddress() { - try { - return InetAddress.getLocalHost().getHostAddress(); - } - catch (UnknownHostException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - - private static WebClient webClient = WebClient.builder().build(); - - protected static WebClient webClient() { - return webClient; - } - - protected static File resourceAsFile(String path) { - try { - return new ClassPathResource(path).getFile(); - } - catch (IOException e) { - throw new IllegalStateException("Unable to access resource " + path); - } - } - - // Junit TempDir does not work with DockerComposeContainer unless you mount it. - // Also doesn't work as @BeforeAll in this case. - static Path initializeTempDir() { - Path tempDir; - try { - Path tempRoot = Paths.get(new ClassPathResource("/").getFile().getAbsolutePath()); - - tempDir = Files.createTempDirectory(tempRoot, UUID.randomUUID().toString()); - tempDir.toFile().deleteOnExit(); - } - catch (Exception e) { - throw new IllegalStateException(e.getMessage(), e); - } - - return tempDir; - } - - protected static int findAvailablePort() { - return SocketUtils.findAvailableTcpPort(10000, 20000); - } - - private static Properties loadGlobalProperties(String path) { - Properties globalProperties = new Properties(); - try { - globalProperties.load(new ClassPathResource(path).getInputStream()); - } - catch (IOException exception) { - throw new IllegalStateException(exception.getMessage(), exception); - } - globalProperties.put("kafkaBootStrapServers", localHostAddress() + ":" + kafkaBrokerPort); - return globalProperties; - } - - private static void startKafkaContainer() { - DockerComposeContainer kafkaContainer = new DockerComposeContainer( - templateProcessor.withTemplateProperties(fluentMap() - .withEntry("kafkaBrokerPort", kafkaBrokerPort) - .withEntry("hostAddress", localHostAddress())) - .withTemplate(new ClassPathResource("compose-kafka-external.yml")) - .create().processTemplate()); - kafkaContainer - .waitingFor("kafka", Wait.forListeningPort()) - .start(); - } - - public static class AppLog extends Slf4jLogConsumer { - private static final Map appLogs = new ConcurrentHashMap<>(); - public static AppLog appLog(String appName) { - if (!appLogs.containsKey(appName)) { - appLogs.put(appName, new AppLog(appName)); - } - return appLogs.get(appName); - } - - AppLog(String appName) { - super(LoggerFactory.getLogger(appName)); - } - } - -} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/FluentMap.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/FluentMap.java deleted file mode 100644 index 3e48875..0000000 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/FluentMap.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.apps.integration.test.support; - -import java.util.LinkedHashMap; - -public class FluentMap extends LinkedHashMap { - public static FluentMap fluentMap() { - return new FluentMap<>(); - } - - public FluentMap withEntry(K key, V value) { - put(key, value); - return this; - } -} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/KafkaStreamIntegrationTestSupport.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/KafkaStreamIntegrationTestSupport.java new file mode 100644 index 0000000..17d3fde --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/KafkaStreamIntegrationTestSupport.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.support; + +import java.time.Duration; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import org.springframework.cloud.stream.app.test.integration.kafka.AbstractKafkaStreamApplicationIntegrationTests; + +public abstract class KafkaStreamIntegrationTestSupport extends AbstractKafkaStreamApplicationIntegrationTests { + + protected static final String VERSION = "3.0.0-SNAPSHOT"; + + protected static final String DOCKER_ORG = "springcloudstream"; + + protected static DockerImageName defaultKafkaImageFor(String appName) { + return DockerImageName.parse(DOCKER_ORG + "/" + appName + "-kafka:" + VERSION); + } + + protected static GenericContainer httpSource(int serverPort) { + return new GenericContainer(defaultKafkaImageFor("http-source")) + .withEnv("SERVER_PORT", String.valueOf(serverPort)) + .withExposedPorts(serverPort) + .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/LogMatcher.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/LogMatcher.java deleted file mode 100644 index 11841e8..0000000 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/LogMatcher.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.apps.integration.test.support; - -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.regex.Pattern; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.output.OutputFrame; - -public class LogMatcher implements Consumer { - private static Logger logger = LoggerFactory.getLogger(LogMatcher.class); - - private List> listeners = new LinkedList<>(); - - public Callable verifies(Consumer consumer) { - LogListener logListener = new LogListener(); - consumer.accept(logListener); - logListener.runnable.ifPresent(runnable -> runnable.run()); - listeners.add(logListener); - return () -> logListener.matches().get(); - } - - @Override - public void accept(OutputFrame outputFrame) { - listeners.forEach(m -> m.accept(outputFrame.getUtf8String())); - } - - public class LogListener implements Consumer { - private AtomicBoolean matched = new AtomicBoolean(); - - private Optional runnable = Optional.empty(); - - private Pattern pattern; - - @Override - public void accept(String s) { - logger.trace(this + "matching " + s.trim() + " using pattern " + pattern.pattern()); - if (pattern.matcher(s.trim()).matches()) { - logger.debug(" MATCHED " + s.trim()); - matched.set(true); - listeners.remove(this); - } - } - - public LogListener contains(String string) { - return matchesRegex(".*" + string + ".*"); - } - - public LogListener endsWith(String string) { - return matchesRegex(".*" + string); - } - - public LogListener matchesRegex(String regex) { - this.pattern = Pattern.compile(regex); - return this; - } - - public LogListener when(Runnable runnable) { - this.runnable = Optional.of(runnable); - return this; - } - - public AtomicBoolean matches() { - return matched; - } - } -} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/TemplateProcessor.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/TemplateProcessor.java deleted file mode 100644 index 719b9e2..0000000 --- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/support/TemplateProcessor.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.apps.integration.test.support; - -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -import com.samskivert.mustache.Mustache; -import com.samskivert.mustache.Template; -import org.apache.shiro.util.Assert; - -import org.springframework.core.io.Resource; - -public final class TemplateProcessor { - - private final Path outputDirectory; - - private final Resource template; - - private final Map templateProperties; - - private TemplateProcessor(Builder builder) { - outputDirectory = builder.outputDirectory; - template = builder.template; - templateProperties = builder.templateProperties; - } - - private static Map globalProperties; - - public File processTemplate() { - try { - try (InputStreamReader resourcesTemplateReader = new InputStreamReader( - Objects.requireNonNull(template.getInputStream()))) { - Template resourceTemplate = Mustache.compiler().escapeHTML(false).compile(resourcesTemplateReader); - Path temporaryFile = outputDirectory.resolve(Paths.get(template.getFilename())); - if (Files.exists(temporaryFile)) { - Files.delete(temporaryFile); - } - Files.createFile(temporaryFile); - Files.write(temporaryFile, - resourceTemplate.execute(addGlobalProperties(templateProperties)).getBytes()).toFile(); - temporaryFile.toFile().deleteOnExit(); - return temporaryFile.toFile(); - } - } - catch (IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - - private Map addGlobalProperties(Map templateProperties) { - Map enriched = new HashMap<>(); - globalProperties.forEach((key, value) -> enriched.put(key.toString(), value.toString())); - enriched.putAll(templateProperties); - return enriched; - } - - public static TemplateProcessor.Builder builder() { - return TemplateProcessor.withGlobalProperties(Collections.EMPTY_MAP); - } - - public static TemplateProcessor.Builder withGlobalProperties(Map globalProperties) { - TemplateProcessor.globalProperties = globalProperties; - return new TemplateProcessor.Builder(); - } - - public static class Builder { - private Path outputDirectory; - - private Resource template; - - private Map templateProperties; - - public Builder withTemplateProperties(Map templateProperties) { - this.templateProperties = templateProperties; - return this; - } - - public Builder withTemplate(Resource template) { - this.template = template; - return this; - } - - public Builder withOutputDirectory(Path outputDirectory) { - this.outputDirectory = outputDirectory; - return this; - } - - public TemplateProcessor create() { - Assert.notNull(this.outputDirectory, "outputDirectory is required"); - Assert.notNull(this.template, "template is required"); - return new TemplateProcessor(this); - } - } - -} diff --git a/stream-applications-integration-tests/src/test/resources/compose-kafka-external.yml b/stream-applications-integration-tests/src/test/resources/compose-kafka-external.yml deleted file mode 100644 index 8f309e9..0000000 --- a/stream-applications-integration-tests/src/test/resources/compose-kafka-external.yml +++ /dev/null @@ -1,23 +0,0 @@ -version: {{docker.compose.version}} -services: - kafka: - image: confluentinc/cp-kafka:5.5.1 - hostname: kafka - ports: - - "{{kafkaBrokerPort}}:{{kafkaBrokerPort}}" - environment: - - KAFKA_ADVERTISED_LISTENERS=SHARED://{{hostAddress}}:{{kafkaBrokerPort}},LOCAL://kafka:9092 - - KAFKA_ADVERTISED_HOST_NAME=kafka - - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SHARED:PLAINTEXT,LOCAL:PLAINTEXT - - KAFKA_INTER_BROKER_LISTENER_NAME=LOCAL - - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - depends_on: - - zookeeper - - zookeeper: - image: confluentinc/cp-zookeeper:5.5.1 - expose: - - "2181" - environment: - - ZOOKEEPER_CLIENT_PORT=2181 diff --git a/stream-applications-integration-tests/src/test/resources/logback-test.xml b/stream-applications-integration-tests/src/test/resources/logback-test.xml index e6016be..0fbdebf 100644 --- a/stream-applications-integration-tests/src/test/resources/logback-test.xml +++ b/stream-applications-integration-tests/src/test/resources/logback-test.xml @@ -11,5 +11,5 @@ - + \ No newline at end of file diff --git a/stream-applications-integration-tests/src/test/resources/processor/http-request-processor-tests.yml b/stream-applications-integration-tests/src/test/resources/processor/http-request-processor-tests.yml deleted file mode 100644 index 3172dcf..0000000 --- a/stream-applications-integration-tests/src/test/resources/processor/http-request-processor-tests.yml +++ /dev/null @@ -1,27 +0,0 @@ -version: {{docker.compose.version}} -services: - - http-source: - image: springcloudstream/http-source-kafka:{{stream.apps.version}} - ports: - - "{{port}}:{{port}}" - environment: - - SERVER_PORT={{port}} - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=processor - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - http-request-processor: - image: springcloudstream/http-request-processor-kafka:{{stream.apps.version}} - environment: - - HTTP_REQUEST_URL_EXPRESSION='{{url}}' - - HTTP_REQUEST_HTTP_METHOD_EXPRESSION='POST' - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=processor - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-request-processor-tests - log-sink: - image: springcloudstream/log-sink-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-request-processor-tests - diff --git a/stream-applications-integration-tests/src/test/resources/sink/jdbc-sink-tests.yml b/stream-applications-integration-tests/src/test/resources/sink/jdbc-sink-tests.yml deleted file mode 100644 index 64458d5..0000000 --- a/stream-applications-integration-tests/src/test/resources/sink/jdbc-sink-tests.yml +++ /dev/null @@ -1,24 +0,0 @@ -version: {{docker.compose.version}} -services: - - http-source: - image: springcloudstream/http-source-kafka:{{stream.apps.version}} - ports: - - "{{port}}:{{port}}" - environment: - - SERVER_PORT={{port}} - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=jdbc - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - jdbc-sink: - image: springcloudstream/jdbc-sink-kafka:{{stream.apps.version}} - environment: - - JDBC_CONSUMER_COLUMNS=name,city:address.city,street:address.street - - JDBC_CONSUMER_TABLE_NAME=People - - SPRING_DATASOURCE_PASSWORD={{password}} - - SPRING_DATASOURCE_USERNAME={{user}} - - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver - - SPRING_DATASOURCE_URL={{jdbc.url}} - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=jdbc - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=jdbc-sink-tests - diff --git a/stream-applications-integration-tests/src/test/resources/sink/mongodb-sink-tests.yml b/stream-applications-integration-tests/src/test/resources/sink/mongodb-sink-tests.yml deleted file mode 100644 index c47f66a..0000000 --- a/stream-applications-integration-tests/src/test/resources/sink/mongodb-sink-tests.yml +++ /dev/null @@ -1,19 +0,0 @@ -version: {{docker.compose.version}} -services: - http-source: - image: springcloudstream/http-source-kafka:{{stream.apps.version}} - ports: - - "{{port}}:{{port}}" - environment: - - SERVER_PORT={{port}} - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=mongodb - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - mongodb-sink: - image: springcloudstream/mongodb-sink-kafka:{{stream.apps.version}} - environment: - - MONGO_DB_CONSUMER_COLLECTION=test - - SPRING_DATA_MONGODB_URL={{mongodb.url}} - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=mongodb - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=mongodb-sink-tests - diff --git a/stream-applications-integration-tests/src/test/resources/sink/tcp-sink-tests.yml b/stream-applications-integration-tests/src/test/resources/sink/tcp-sink-tests.yml deleted file mode 100644 index 09851cf..0000000 --- a/stream-applications-integration-tests/src/test/resources/sink/tcp-sink-tests.yml +++ /dev/null @@ -1,21 +0,0 @@ -version: {{docker.compose.version}} -services: - - http-source: - image: springcloudstream/http-source-kafka:{{stream.apps.version}} - ports: - - "{{port}}:{{port}}" - environment: - - SERVER_PORT={{port}} - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=tcp - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - tcp-sink: - image: springcloudstream/tcp-sink-kafka:{{stream.apps.version}} - environment: - - TCP_CONSUMER_HOST={{tcp.host}} - - TCP_PORT={{tcp.port}} - - TCP_CONSUMER_ENCODER=CRLF - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=tcp - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=tcp-sink-tests - diff --git a/stream-applications-integration-tests/src/test/resources/source/geode-source-tests.yml b/stream-applications-integration-tests/src/test/resources/source/geode-source-tests.yml deleted file mode 100644 index 0fd9cc0..0000000 --- a/stream-applications-integration-tests/src/test/resources/source/geode-source-tests.yml +++ /dev/null @@ -1,20 +0,0 @@ -version: {{docker.compose.version}} -services: - - geode-source: - image: springcloudstream/geode-source-kafka:{{stream.apps.version}} - environment: - - GEODE_POOL_CONNECT_TYPE=server - - GEODE_REGION_REGION_NAME={{geode.region}} - - GEODE_POOL_HOST_ADDRESSES={{geode.host-addresses}} - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - extra_hosts: - - geode:{{geodeHost}} - log-sink: - image: springcloudstream/log-sink-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=geode-source-tests - diff --git a/stream-applications-integration-tests/src/test/resources/source/http-source-tests.yml b/stream-applications-integration-tests/src/test/resources/source/http-source-tests.yml deleted file mode 100644 index 9e144fb..0000000 --- a/stream-applications-integration-tests/src/test/resources/source/http-source-tests.yml +++ /dev/null @@ -1,18 +0,0 @@ -version: {{docker.compose.version}} -services: - - http-source: - image: springcloudstream/http-source-kafka:{{stream.apps.version}} - ports: - - "{{port}}:{{port}}" - environment: - - SERVER_PORT={{port}} - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - log-sink: - image: springcloudstream/log-sink-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-source-tests - diff --git a/stream-applications-integration-tests/src/test/resources/source/jdbc-source-tests.yml b/stream-applications-integration-tests/src/test/resources/source/jdbc-source-tests.yml deleted file mode 100644 index 53a047a..0000000 --- a/stream-applications-integration-tests/src/test/resources/source/jdbc-source-tests.yml +++ /dev/null @@ -1,33 +0,0 @@ -version: {{docker.compose.version}} -services: - mysql: - image: mysql:5.7 - command: --init-file /init.sql - volumes: - - {{init.sql}}:/init.sql - environment: - MYSQL_USER: root - MYSQL_ROOT_PASSWORD: secret - expose: - - 3306 - - jdbc-source: - image: springcloudstream/jdbc-source-kafka:{{stream.apps.version}} - depends_on: - - mysql - environment: - - JDBC_SUPPLIER_QUERY=SELECT * FROM People WHERE deleted='N' - - JDBC_SUPPLIER_UPDATE=UPDATE People SET deleted='Y' WHERE id=:id - - SPRING_DATASOURCE_PASSWORD=secret - - SPRING_DATASOURCE_USERNAME=root - - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver - - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/test - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - log-sink: - image: springcloudstream/log-sink-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=jdbc-source-tests - diff --git a/stream-applications-integration-tests/src/test/resources/source/s3-source-tests.yml b/stream-applications-integration-tests/src/test/resources/source/s3-source-tests.yml deleted file mode 100644 index 9af7697..0000000 --- a/stream-applications-integration-tests/src/test/resources/source/s3-source-tests.yml +++ /dev/null @@ -1,28 +0,0 @@ -version: {{docker.compose.version}} -services: - s3-source: - image: springcloudstream/s3-source-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_FUNCTION_DEFINITION={{functionDefinition}} - - FILE_CONSUMER_MODE={{consumerMode}} - - S3_COMMON_ENDPOINT_URL={{s3.endpoint.url}} - - S3_COMMON_PATH_STYLE_ACCESS=true - - S3_SUPPLIER_REMOTE_DIR=bucket - - S3_SUPPLIER_LIST_ONLY={{listOnly}} - - CLOUD_AWS_STACK_AUTO=false - - CLOUD_AWS_CREDENTIALS_ACCESS_KEY=minio - - CLOUD_AWS_CREDENTIALS_SECRET_KEY=minio123 - - CLOUD_AWS_REGION_STATIC=us-east-1 - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} -# For Task Launch Request - - TASK_LAUNCH_REQUEST_ARG_EXPRESSIONS=filename=payload - - TASK_LAUNCH_REQUEST_TASK_NAME=myTask - extra_hosts: - - minio:{{minioHost}} - log-sink: - image: springcloudstream/log-sink-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=s3-source-tests \ No newline at end of file diff --git a/stream-applications-integration-tests/src/test/resources/source/sftp-source-tests.yml b/stream-applications-integration-tests/src/test/resources/source/sftp-source-tests.yml deleted file mode 100644 index ad9a795..0000000 --- a/stream-applications-integration-tests/src/test/resources/source/sftp-source-tests.yml +++ /dev/null @@ -1,27 +0,0 @@ -version: {{docker.compose.version}} -services: - sftp-source: - image: springcloudstream/sftp-source-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_FUNCTION_DEFINITION={{functionDefinition}} - - FILE_CONSUMER_MODE={{consumerMode}} - - SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS=true - - SFTP_SUPPLIER_REMOTE_DIR=/remote - - SFTP_SUPPLIER_FACTORY_USERNAME=user - - SFTP_SUPPLIER_FACTORY_PASSWORD=pass - - SFTP_SUPPLIER_FACTORY_HOST=sftp - - SFTP_SUPPLIER_FACTORY_PORT={{sftpPort}} - - SFTP_SUPPLIER_LIST_ONLY={{listOnly}} - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} -# For Task Launch Request - - TASK_LAUNCH_REQUEST_ARG_EXPRESSIONS=filename=payload - - TASK_LAUNCH_REQUEST_TASK_NAME=myTask - extra_hosts: - - sftp:{{sftpHost}} - log-sink: - image: springcloudstream/log-sink-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=sftp-source-tests \ No newline at end of file diff --git a/stream-applications-integration-tests/src/test/resources/test.properties b/stream-applications-integration-tests/src/test/resources/test.properties deleted file mode 100644 index aed3ec9..0000000 --- a/stream-applications-integration-tests/src/test/resources/test.properties +++ /dev/null @@ -1,2 +0,0 @@ -docker.compose.version='2.4' -stream.apps.version=3.0.0-SNAPSHOT diff --git a/stream-applications-integration-tests/src/test/resources/tick-tock-tests.yml b/stream-applications-integration-tests/src/test/resources/tick-tock-tests.yml deleted file mode 100644 index b0a080c..0000000 --- a/stream-applications-integration-tests/src/test/resources/tick-tock-tests.yml +++ /dev/null @@ -1,14 +0,0 @@ -version: {{docker.compose.version}} -services: - time-source: - image: springcloudstream/time-source-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - log-sink: - image: springcloudstream/log-sink-kafka:{{stream.apps.version}} - environment: - - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}} - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log - - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=ticktock-tests -