Refactor to remove docker-compose

This commit is contained in:
David Turanski
2020-10-17 13:09:13 -04:00
parent 2f3e46d0d8
commit fc5e51342c
30 changed files with 423 additions and 890 deletions

View File

@@ -17,9 +17,10 @@
<properties>
<java.version>1.8</java.version>
<java-functions.version>1.0.0-SNAPSHOT</java-functions.version>
<stream-applications.version>3.0.0-SNAPSHOT</stream-applications.version>
<mariadb-client.version>2.6.2</mariadb-client.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
<testcontainers.version>1.14.3</testcontainers.version>
<testcontainers.version>1.15.0-rc2</testcontainers.version>
<jmustache.version>1.15</jmustache.version>
<maven-checkstyle-plugin.version>3.1.0</maven-checkstyle-plugin.version>
<disable.checks>false</disable.checks>
@@ -48,6 +49,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -60,11 +62,29 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mariadb</artifactId>
@@ -75,6 +95,23 @@
<artifactId>mongodb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-test-support</artifactId>
<version>${stream-applications.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>function-test-support</artifactId>
@@ -100,6 +137,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
@@ -113,22 +151,10 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.samskivert</groupId>
<artifactId>jmustache</artifactId>
<version>${jmustache.version}</version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>

View File

@@ -0,0 +1,66 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test;
import java.time.Duration;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap;
public class TikTokBaselineTests extends KafkaStreamIntegrationTestSupport {
// "MM/dd/yy HH:mm:ss";
private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
private final static LogMatcher logMatcher = new LogMatcher();
@Container
static GenericContainer timeSource = new GenericContainer(
DockerImageName.parse("springcloudstream/time-source-kafka:3.0.0-SNAPSHOT"))
.withNetwork(kafka.getNetwork())
.withEnv(fluentMap().withEntry("SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION", "TikTok")
.withEntry("SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS",
kafka.getNetworkAliases().get(0) + ":9092"))
.dependsOn(kafka);
@Container
static GenericContainer logSink = new GenericContainer(
DockerImageName.parse("springcloudstream/log-sink-kafka:3.0.0-SNAPSHOT"))
.withNetwork(kafka.getNetwork())
.withEnv(fluentMap().withEntry("SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION", "TikTok")
.withEntry("SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS",
kafka.getNetworkAliases().get(0) + ":9092")
.withEntry("SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP", "TikTok"))
.withLogConsumer(logMatcher)
.withLogConsumer(appLog("log-sink"))
.dependsOn(kafka);
@Test
void tiktok() {
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started LogSink")));
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.matchesRegex(pattern.pattern())));
}
}

View File

@@ -20,30 +20,33 @@ import java.time.Duration;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class TickTockTests extends AbstractStreamApplicationTests {
public class TikTokTests extends KafkaStreamIntegrationTestSupport {
// "MM/dd/yy HH:mm:ss";
private final Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
private final static Pattern DATE_PATTERN = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
private final LogMatcher logMatcher = new LogMatcher();
private final static LogMatcher logMatcher = new LogMatcher();
@Container
private final DockerComposeContainer environment = new DockerComposeContainer(
templateProcessor("tick-tock-tests.yml").processTemplate())
.withLogConsumer("log-sink", logMatcher)
.withLogConsumer("log-sink", appLog("log-sink"));
static StreamApps streamApps = kafkaStreamApps("tikTok", kafka)
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("time-source")))
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink"))
.withLogConsumer(logMatcher))
.build();
@Test
void ticktock() {
void tiktok() {
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started LogSink")));
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.matchesRegex(pattern.pattern())));
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(log -> log.matchesRegex(DATE_PATTERN.pattern())));
}
}

View File

@@ -25,43 +25,45 @@ import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class HttpRequestProcessorTests extends AbstractStreamApplicationTests {
public class HttpRequestProcessorTests extends KafkaStreamIntegrationTestSupport {
private static MockWebServer server = new MockWebServer();
private static int serverPort = findAvailablePort();
private static String url = "http://" + localHostAddress() + ":" + serverPort;
private static int sourcePort = findAvailablePort();
private static WebClient webClient = WebClient.builder().build();
private static LogMatcher logMatcher = new LogMatcher();
private static int serverPort = findAvailablePort();
private static int sourcePort = findAvailablePort();
@Container
private static final DockerComposeContainer environment = new DockerComposeContainer(
templateProcessor("processor/http-request-processor-tests.yml", fluentMap()
.withEntry("port", sourcePort)
.withEntry("url", url)).processTemplate())
.withLogConsumer("log-sink", appLog("log-sink"))
.withLogConsumer("log-sink", logMatcher)
.withExposedService("http-source", sourcePort,
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
private static final StreamApps streamApps = kafkaStreamApps(
HttpRequestProcessorTests.class.getSimpleName(), kafka)
.withSourceContainer(httpSource(sourcePort))
.withProcessorContainer(new GenericContainer(defaultKafkaImageFor("http-request-processor"))
.withEnv("HTTP_REQUEST_URL_EXPRESSION",
"'http://" + localHostAddress() + ":" + serverPort + "'")
.withEnv("HTTP_REQUEST_HTTP_METHOD_EXPRESSION", "'POST'"))
.withSinkContainer(
new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher))
.build();
@BeforeAll
static void startServer() throws Exception {
@@ -81,9 +83,9 @@ public class HttpRequestProcessorTests extends AbstractStreamApplicationTests {
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(log -> log.when(() -> {
ClientResponse response = webClient()
ClientResponse response = webClient
.post()
.uri("http://localhost:" + sourcePort)
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(sourcePort))
.contentType(MediaType.TEXT_PLAIN)
.body(Mono.just("ping"), String.class)
.exchange()
@@ -91,4 +93,5 @@ public class HttpRequestProcessorTests extends AbstractStreamApplicationTests {
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
}).matchesRegex(".*\\{\"response\":\"ping\"\\}")));
}
}

View File

@@ -21,67 +21,75 @@ import java.time.Duration;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.http.MediaType;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class JdbcSinkTests extends AbstractStreamApplicationTests {
public class JdbcSinkTests extends KafkaStreamIntegrationTestSupport {
private static int port = findAvailablePort();
private static int serverPort = findAvailablePort();
private static JdbcTemplate jdbcTemplate;
@Container
private static MariaDBContainer mariadbContainer = (MariaDBContainer) new MariaDBContainer()
.withDatabaseName("test")
.withPassword("password")
.withUsername("user")
.withInitScript("init.sql")
.withExposedPorts(3306)
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
private static WebClient webClient = WebClient.builder().build();
@Container
private DockerComposeContainer environment = new DockerComposeContainer(
templateProcessor("sink/jdbc-sink-tests.yml", fluentMap()
.withEntry("jdbc.url",
mariadbContainer.getJdbcUrl().replace("localhost",
localHostAddress()))
.withEntry("user", mariadbContainer.getUsername())
.withEntry("password", mariadbContainer.getPassword())
.withEntry("port", port)).processTemplate())
.withLogConsumer("jdbc-sink", appLog("jdbc-sink"))
.withExposedService("http-source", port,
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
private static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7"))
.withUsername("test")
.withPassword("secret")
.withExposedPorts(3306)
.withNetwork(kafka.getNetwork())
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withCommand("--init-file", "/init.sql");
@Container
private static StreamApps streamApps = kafkaStreamApps(JdbcSinkTests.class.getSimpleName(), kafka)
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("http-source"))
.withEnv("SERVER_PORT", String.valueOf(serverPort))
.withExposedPorts(serverPort)
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))))
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("jdbc-sink"))
.withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
.withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mysql://" + mySQL.getNetworkAliases().get(0) + ":3306/test"))
.build();
@BeforeAll
static void buildJdbcTemplate() {
static void startStreamApps() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName(mariadbContainer.getDriverClassName());
dataSource.setUsername(mariadbContainer.getUsername());
dataSource.setPassword(mariadbContainer.getPassword());
dataSource.setJdbcUrl(mariadbContainer.getJdbcUrl());
dataSource.setDriverClassName("org.mariadb.jdbc.Driver");
dataSource.setUsername(mySQL.getUsername());
dataSource.setPassword(mySQL.getPassword());
dataSource.setJdbcUrl("jdbc:mysql://localhost:" + mySQL.getMappedPort(3306) + "/test");
jdbcTemplate = new JdbcTemplate(dataSource);
jdbcTemplate.execute("DELETE FROM People");
}
@Test
void postData() {
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\": \"Narrow Alley\"}}";
ClientResponse response = webClient()
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\":\"Narrow Alley\"}}";
ClientResponse response = webClient
.post()
.uri("http://localhost:" + port)
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort))
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(json), String.class)
.exchange()
@@ -90,8 +98,10 @@ public class JdbcSinkTests extends AbstractStreamApplicationTests {
await().atMost(Duration.ofSeconds(30))
.untilAsserted(
() -> assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class))
.isOne());
assertThat(jdbcTemplate.queryForObject("SELECT name from People", String.class)).isEqualTo("My Name");
() -> assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) from People",
Integer.class))
.isOne());
assertThat(jdbcTemplate.queryForObject("SELECT name from People",
String.class)).isEqualTo("My Name");
}
}

View File

@@ -22,31 +22,34 @@ import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class MongoDBSinkTests extends AbstractStreamApplicationTests {
public class MongoDBSinkTests extends KafkaStreamIntegrationTestSupport {
private static int port = findAvailablePort();
private static int serverPort = findAvailablePort();
private static MongoTemplate mongoTemplate;
private static WebClient webClient = WebClient.builder().build();
@Container
private static MongoDBContainer mongoDBContainer = new MongoDBContainer()
private static MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"))
.withExposedPorts(27017)
.withStartupTimeout(Duration.ofMinutes(2));
@@ -54,29 +57,27 @@ public class MongoDBSinkTests extends AbstractStreamApplicationTests {
return String.format("mongodb://%s:%s/%s", localHostAddress(), mongoDBContainer.getMappedPort(27017), "test");
}
@Container
private StreamApps streamApps = kafkaStreamApps(MongoDBSinkTests.class.getSimpleName(), kafka)
.withSourceContainer(httpSource(serverPort))
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("mongodb-sink"))
.withEnv("MONGO_DB_CONSUMER_COLLECTION", "test")
.withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString()))
.build();
@BeforeAll
private static void buildMongoTemplate() {
static void buildMongoTemplate() {
MongoDatabaseFactory mongoDatabaseFactory = new SimpleMongoClientDatabaseFactory(
mongoConnectionString());
mongoTemplate = new MongoTemplate(mongoDatabaseFactory);
}
@Container
private DockerComposeContainer environment = new DockerComposeContainer(
templateProcessor("sink/mongodb-sink-tests.yml", fluentMap()
.withEntry("mongodb.url", mongoConnectionString())
.withEntry("port", port)).processTemplate())
.withLogConsumer("jdbc-sink",
appLog("jdbc-sink"))
.withExposedService("http-source", port,
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
@Test
void postData() {
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\": \"Narrow Alley\"}}";
ClientResponse response = webClient()
ClientResponse response = webClient
.post()
.uri("http://localhost:" + port)
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort))
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(json), String.class)
.exchange()

View File

@@ -27,21 +27,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class TcpSinkTests extends AbstractStreamApplicationTests {
public class TcpSinkTests extends KafkaStreamIntegrationTestSupport {
private static final int port = findAvailablePort();
@@ -51,15 +51,16 @@ public class TcpSinkTests extends AbstractStreamApplicationTests {
private static final AtomicBoolean socketReady = new AtomicBoolean();
private static WebClient webClient = WebClient.builder().build();
@Container
private static final DockerComposeContainer environment = new DockerComposeContainer(
templateProcessor("sink/tcp-sink-tests.yml", fluentMap()
.withEntry("port", port)
.withEntry("tcp.port", tcpPort)
.withEntry("tcp.host", localHostAddress())).processTemplate())
.withLogConsumer("tcp-sink", appLog("tcp-sink"))
.withExposedService("http-source", port,
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
private static StreamApps streamApps = kafkaStreamApps(TcpSinkTests.class.getSimpleName(), kafka)
.withSourceContainer(httpSource(port))
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("tcp-sink"))
.withEnv("TCP_CONSUMER_HOST", localHostAddress())
.withEnv("TCP_PORT", String.valueOf(tcpPort))
.withEnv("TCP_CONSUMER_ENCODER", "CRLF"))
.build();
@BeforeAll
static void startTcpServer() {
@@ -77,9 +78,9 @@ public class TcpSinkTests extends AbstractStreamApplicationTests {
@Test
void postData() throws IOException {
String text = "Hello, world!";
ClientResponse response = webClient()
ClientResponse response = webClient
.post()
.uri("http://localhost:" + port)
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(port))
.contentType(MediaType.TEXT_PLAIN)
.body(Mono.just(text), String.class)
.exchange()

View File

@@ -31,19 +31,19 @@ import org.apache.geode.cache.client.ClientRegionShortcut;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.fn.test.support.geode.GeodeContainer;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class GeodeSourceTests extends AbstractStreamApplicationTests {
public class GeodeSourceTests extends KafkaStreamIntegrationTestSupport {
private static final LogMatcher logMatcher = new LogMatcher();
@@ -91,15 +91,15 @@ public class GeodeSourceTests extends AbstractStreamApplicationTests {
}
@Container
private final DockerComposeContainer environment = new DockerComposeContainer(
templateProcessor("source/geode-source-tests.yml", fluentMap()
.withEntry("geode.host-addresses", "geode:" + cacheServerPort)
.withEntry("geodeHost", localHostAddress())
.withEntry("geode.region", "myRegion")).processTemplate())
.withLogConsumer("log-sink", appLog("log-sink"))
.withLogConsumer("geode-source", geodeLogMatcher)
.withLogConsumer("log-sink", logMatcher)
.withLocalCompose(true);
private final StreamApps streamApps = kafkaStreamApps(this.getClass().getSimpleName(), kafka)
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("geode-source"))
.withEnv("GEODE_POOL_CONNECT_TYPE", "server")
.withEnv("GEODE_REGION_REGION_NAME", "myRegion")
.withEnv("GEODE_POOL_HOST_ADDRESSES", localHostAddress() + ":" + cacheServerPort)
.withLogConsumer(geodeLogMatcher))
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink"))
.withLogConsumer(logMatcher))
.build();
@Test
void test() {

View File

@@ -17,44 +17,44 @@
package org.springframework.cloud.stream.apps.integration.test.source;
import java.time.Duration;
import java.util.Collections;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class HttpSourceTests extends AbstractStreamApplicationTests {
public class HttpSourceTests extends KafkaStreamIntegrationTestSupport {
private static int port = findAvailablePort();
private static int serverPort = findAvailablePort();
private static WebClient webClient = WebClient.builder().build();
private static LogMatcher logMatcher = new LogMatcher();
@Container
private static final DockerComposeContainer environment = new DockerComposeContainer(
templateProcessor("source/http-source-tests.yml", Collections.singletonMap("port", port)).processTemplate())
.withLogConsumer("log-sink", appLog("log-sink"))
.withLogConsumer("log-sink", logMatcher)
.withExposedService("http-source", port,
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
private static final StreamApps streamApps = kafkaStreamApps(HttpSourceTests.class.getSimpleName(), kafka)
.withSourceContainer(httpSource(serverPort))
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher))
.build();
@Test
void plaintext() {
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(log -> log.when(() -> {
ClientResponse response = webClient()
ClientResponse response = webClient
.post()
.uri("http://localhost:" + port)
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort))
.contentType(MediaType.TEXT_PLAIN)
.body(Mono.just("Hello"), String.class)
.exchange()
@@ -67,9 +67,9 @@ public class HttpSourceTests extends AbstractStreamApplicationTests {
void json() {
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(log -> log.when(() -> {
ClientResponse response = webClient()
ClientResponse response = webClient
.post()
.uri("http://localhost:" + port)
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort))
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just("{\"Hello\":\"world\"}"), String.class)
.exchange()
@@ -77,4 +77,5 @@ public class HttpSourceTests extends AbstractStreamApplicationTests {
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
}).matchesRegex(".*\\{\"Hello\":\"world\"\\}")));
}
}

View File

@@ -17,29 +17,47 @@
package org.springframework.cloud.stream.apps.integration.test.source;
import java.time.Duration;
import java.util.Collections;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class JdbcSourceTests extends AbstractStreamApplicationTests {
public class JdbcSourceTests extends KafkaStreamIntegrationTestSupport {
private static LogMatcher logMatcher = new LogMatcher();
@Container
private static final DockerComposeContainer environment = new DockerComposeContainer(
templateProcessor("source/jdbc-source-tests.yml",
Collections.singletonMap("init.sql", resourceAsFile("init.sql"))).processTemplate())
.withLogConsumer("log-sink", logMatcher)
.waitingFor("jdbc-source", Wait.forLogMessage(".*Started JdbcSource.*", 1)
.withStartupTimeout(Duration.ofMinutes(2)));
public static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7"))
.withUsername("test")
.withPassword("secret")
.withExposedPorts(3306)
.withNetwork(kafka.getNetwork())
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withCommand("--init-file", "/init.sql");
@Container
private static final StreamApps streamApps = kafkaStreamApps(JdbcSourceTests.class.getSimpleName(), kafka)
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("jdbc-source"))
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mysql://" + mySQL.getNetworkAliases().get(0) + ":3306/test"))
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink"))
.withLogConsumer(logMatcher))
.build();
@Test
void test() {

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.apps.integration.test.source;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
import com.amazonaws.ClientConfiguration;
@@ -32,35 +33,38 @@ import com.github.dockerjava.api.command.CreateContainerCmd;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher;
import org.springframework.cloud.stream.apps.integration.test.support.TemplateProcessor;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class S3SourceTests extends AbstractStreamApplicationTests {
public class S3SourceTests extends KafkaStreamIntegrationTestSupport {
private static AmazonS3 s3Client;
private static LogMatcher logMatcher = new LogMatcher();
@Container
private static final GenericContainer minio = new GenericContainer("minio/minio:RELEASE.2020-09-05T07-14-49Z")
.withExposedPorts(9000)
.withEnv("MINIO_ACCESS_KEY", "minio")
.withEnv("MINIO_SECRET_KEY", "minio123")
.waitingFor(Wait.forHttp("/minio/health/live"))
.withCreateContainerCmdModifier(
(Consumer<CreateContainerCmd>) createContainerCmd -> createContainerCmd.withHostName("minio"))
.withLogConsumer(appLog("minio"))
.withCommand("minio", "server", "/data");
private static final GenericContainer minio = new GenericContainer(
DockerImageName.parse("minio/minio:RELEASE.2020-09-05T07-14-49Z"))
.withExposedPorts(9000)
.withEnv("MINIO_ACCESS_KEY", "minio")
.withEnv("MINIO_SECRET_KEY", "minio123")
.waitingFor(Wait.forHttp("/minio/health/live"))
.withCreateContainerCmdModifier(
(Consumer<CreateContainerCmd>) createContainerCmd -> createContainerCmd
.withHostName("minio"))
.withLogConsumer(appLog("minio"))
.withCommand("minio", "server", "/data");
@BeforeAll
static void init() {
@@ -78,44 +82,46 @@ public class S3SourceTests extends AbstractStreamApplicationTests {
}
private DockerComposeContainer environment;
private StreamApps streamApps = kafkaStreamApps(S3SourceTests.class.getSimpleName(), kafka)
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("s3-source"))
.withEnv("S3_SUPPLIER_REMOTE_DIR", "bucket")
.withEnv("S3_COMMON_ENDPOINT_URL", "http://" + localHostAddress() + ":" + minio.getMappedPort(9000))
.withEnv("S3_COMMON_PATH_STYLE_ACCESS", "true")
.withEnv("CLOUD_AWS_STACK_AUTO", "false")
.withEnv("CLOUD_AWS_CREDENTIALS_ACCESS_KEY", "minio")
.withEnv("CLOUD_AWS_CREDENTIALS_SECRET_KEY", "minio123")
.withEnv("CLOUD_AWS_REGION_STATIC", "us-east-1")
.withLogConsumer(logMatcher))
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher))
.build();
@Test
void testLines() {
startContainer(
templateProcessor("source/s3-source-tests.yml",
fluentMap().withEntry("s3.local.dir", resourceAsFile("minio"))
.withEntry("s3.endpoint.url",
"http://minio:" + minio.getMappedPort(9000))
.withEntry("functionDefinition", "s3Supplier")
.withEntry("consumerMode", "lines")
.withEntry("listOnly", false)
.withEntry("minioHost", localHostAddress())));
fluentMap().withEntry("FILE_CONSUMER_MODE", "lines"));
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started S3Source")));
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.when(() -> {
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test", resourceAsFile("minio/data")));
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
}).contains("Bart Simpson")));
}
@Test
void testTaskLaunchRequest() {
startContainer(templateProcessor("source/s3-source-tests.yml",
fluentMap().withEntry("s3.local.dir", resourceAsFile("minio"))
.withEntry("s3.endpoint.url",
"http://minio:" + minio.getMappedPort(9000))
.withEntry("functionDefinition", "s3Supplier|taskLaunchRequestFunction")
.withEntry("consumerMode", "ref")
.withEntry("listOnly", false)
.withEntry("minioHost", localHostAddress())));
environment.start();
startContainer(fluentMap()
.withEntry("SPRING_CLOUD_FUNCTION_DEFINITION", "s3Supplier|taskLaunchRequestFunction")
.withEntry("TASK_LAUNCH_REQUEST_ARG_EXPRESSIONS", "filename=payload")
.withEntry("TASK_LAUNCH_REQUEST_TASK_NAME", "myTask")
.withEntry("FILE_CONSUMER_MODE", "ref"));
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started S3Source")));
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.when(() -> {
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test", resourceAsFile("minio/data")));
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
}).endsWith(
"\\{\"args\":\\[\"filename=/tmp/s3-supplier/test\"\\],\"deploymentProps\":\\{\\},\"name\":\"myTask\"\\}")));
}
@@ -123,30 +129,22 @@ public class S3SourceTests extends AbstractStreamApplicationTests {
@Test
void testListOnly() {
startContainer(templateProcessor("source/s3-source-tests.yml",
fluentMap().withEntry("s3.local.dir", resourceAsFile("minio"))
.withEntry("s3.endpoint.url",
"http://minio:" + minio.getMappedPort(9000))
.withEntry("functionDefinition", "s3Supplier")
.withEntry("consumerMode", "ref")
.withEntry("listOnly", true)
.withEntry("minioHost", localHostAddress())));
environment.start();
startContainer(
fluentMap()
.withEntry("FILE_CONSUMER_MODE", "ref")
.withEntry("S3_SUPPLIER_LIST_ONLY", "true"));
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started S3Source")));
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.when(() -> {
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test", resourceAsFile("minio/data")));
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
}).contains("\"bucketName\":\"bucket\",\"key\":\"test\"")));
}
private void startContainer(TemplateProcessor templateProcessor) {
environment = new DockerComposeContainer(
templateProcessor.processTemplate())
.withLogConsumer("log-sink", logMatcher)
.withLogConsumer("s3-source", logMatcher)
.withLogConsumer("log-sink", appLog("log-sink"));
environment.start();
private void startContainer(Map<String, String> environment) {
streamApps.sourceContainer().withEnv(environment);
streamApps.start();
}
@AfterEach
@@ -155,6 +153,6 @@ public class S3SourceTests extends AbstractStreamApplicationTests {
s3Client.deleteObject("bucket", "test");
s3Client.deleteBucket("bucket");
}
environment.stop();
streamApps.stop();
}
}

View File

@@ -17,58 +17,62 @@
package org.springframework.cloud.stream.apps.integration.test.source;
import java.time.Duration;
import java.util.Map;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests;
import org.springframework.cloud.stream.apps.integration.test.support.LogMatcher;
import org.springframework.cloud.stream.apps.integration.test.support.TemplateProcessor;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.support.AbstractStreamApplicationTests.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class SftpSourceTests extends AbstractStreamApplicationTests {
public class SftpSourceTests extends KafkaStreamIntegrationTestSupport {
private static LogMatcher logMatcher = new LogMatcher();
@Container
private static final GenericContainer sftp = (GenericContainer) new GenericContainer("atmoz/sftp")
private static final GenericContainer sftp = new GenericContainer(DockerImageName.parse("atmoz/sftp"))
.withExposedPorts(22)
.withCommand("user:pass:::remote")
.withClasspathResourceMapping("sftp", "/home/user/remote", BindMode.READ_ONLY)
.withStartupTimeout(Duration.ofMinutes(1));
private DockerComposeContainer environment;
private StreamApps streamApps = kafkaStreamApps(SftpSourceTests.class.getSimpleName(), kafka)
.withSourceContainer(new GenericContainer(defaultKafkaImageFor("sftp-source"))
.withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
.withEnv("SFTP_SUPPLIER_REMOTE_DIR", "/remote")
.withEnv("SFTP_SUPPLIER_FACTORY_USERNAME", "user")
.withEnv("SFTP_SUPPLIER_FACTORY_PASSWORD", "pass")
.withEnv("SFTP_SUPPLIER_FACTORY_PORT", String.valueOf(sftp.getMappedPort(22)))
.withEnv("SFTP_SUPPLIER_FACTORY_HOST", localHostAddress()))
.withSinkContainer(new GenericContainer(defaultKafkaImageFor("log-sink")).withLogConsumer(logMatcher))
.build();
// TODO: This fixture supports additional tests with different modes, etc.
@Test
void test() {
startContainer(templateProcessor("source/sftp-source-tests.yml",
fluentMap().withEntry("sftpPort", sftp.getMappedPort(22))
.withEntry("functionDefinition", "sftpSupplier")
.withEntry("consumerMode", "ref")
.withEntry("listOnly", false)
.withEntry("sftpHost", localHostAddress())));
startContainer(
fluentMap().withEntry("FILE_CONSUMER_MODE", "ref"));
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(logListener -> logListener.endsWith("\"/tmp/sftp-supplier/data.txt\"")));
}
private void startContainer(TemplateProcessor templateProcessor) {
environment = new DockerComposeContainer(
templateProcessor.processTemplate())
.withLogConsumer("log-sink", logMatcher)
.withLogConsumer("sftp-source", logMatcher)
.withLogConsumer("log-sink", appLog("log-sink"));
environment.start();
private void startContainer(Map<String, String> environment) {
streamApps.sourceContainer().withEnv(environment);
streamApps.start();
}
@AfterEach
void stop() {
environment.stop();
streamApps.stop();
}
}

View File

@@ -1,149 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.support;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.SocketUtils;
import org.springframework.web.reactive.function.client.WebClient;
import static org.springframework.cloud.stream.apps.integration.test.support.FluentMap.fluentMap;
@Testcontainers
public abstract class AbstractStreamApplicationTests {
private static int kafkaBrokerPort;
private static TemplateProcessor.Builder templateProcessor;
static {
kafkaBrokerPort = findAvailablePort();
templateProcessor = TemplateProcessor.withGlobalProperties(loadGlobalProperties("test.properties"))
.withOutputDirectory(initializeTempDir());
startKafkaContainer();
}
protected static TemplateProcessor templateProcessor(String path) {
return templateProcessor.withTemplate(new ClassPathResource(path)).create();
}
protected static TemplateProcessor templateProcessor(String path, Map<?, ?> templateProperties) {
return templateProcessor.withTemplate(new ClassPathResource(path)).withTemplateProperties(templateProperties)
.create();
}
protected static String localHostAddress() {
try {
return InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
private static WebClient webClient = WebClient.builder().build();
protected static WebClient webClient() {
return webClient;
}
protected static File resourceAsFile(String path) {
try {
return new ClassPathResource(path).getFile();
}
catch (IOException e) {
throw new IllegalStateException("Unable to access resource " + path);
}
}
// Junit TempDir does not work with DockerComposeContainer unless you mount it.
// Also doesn't work as @BeforeAll in this case.
static Path initializeTempDir() {
Path tempDir;
try {
Path tempRoot = Paths.get(new ClassPathResource("/").getFile().getAbsolutePath());
tempDir = Files.createTempDirectory(tempRoot, UUID.randomUUID().toString());
tempDir.toFile().deleteOnExit();
}
catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return tempDir;
}
protected static int findAvailablePort() {
return SocketUtils.findAvailableTcpPort(10000, 20000);
}
private static Properties loadGlobalProperties(String path) {
Properties globalProperties = new Properties();
try {
globalProperties.load(new ClassPathResource(path).getInputStream());
}
catch (IOException exception) {
throw new IllegalStateException(exception.getMessage(), exception);
}
globalProperties.put("kafkaBootStrapServers", localHostAddress() + ":" + kafkaBrokerPort);
return globalProperties;
}
private static void startKafkaContainer() {
DockerComposeContainer kafkaContainer = new DockerComposeContainer(
templateProcessor.withTemplateProperties(fluentMap()
.withEntry("kafkaBrokerPort", kafkaBrokerPort)
.withEntry("hostAddress", localHostAddress()))
.withTemplate(new ClassPathResource("compose-kafka-external.yml"))
.create().processTemplate());
kafkaContainer
.waitingFor("kafka", Wait.forListeningPort())
.start();
}
public static class AppLog extends Slf4jLogConsumer {
private static final Map<String, AppLog> appLogs = new ConcurrentHashMap<>();
public static AppLog appLog(String appName) {
if (!appLogs.containsKey(appName)) {
appLogs.put(appName, new AppLog(appName));
}
return appLogs.get(appName);
}
AppLog(String appName) {
super(LoggerFactory.getLogger(appName));
}
}
}

View File

@@ -1,30 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.support;
import java.util.LinkedHashMap;
public class FluentMap<K, V> extends LinkedHashMap<K, V> {
public static FluentMap fluentMap() {
return new FluentMap<>();
}
public FluentMap<K, V> withEntry(K key, V value) {
put(key, value);
return this;
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.support;
import java.time.Duration;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.kafka.AbstractKafkaStreamApplicationIntegrationTests;
public abstract class KafkaStreamIntegrationTestSupport extends AbstractKafkaStreamApplicationIntegrationTests {
protected static final String VERSION = "3.0.0-SNAPSHOT";
protected static final String DOCKER_ORG = "springcloudstream";
protected static DockerImageName defaultKafkaImageFor(String appName) {
return DockerImageName.parse(DOCKER_ORG + "/" + appName + "-kafka:" + VERSION);
}
protected static GenericContainer httpSource(int serverPort) {
return new GenericContainer(defaultKafkaImageFor("http-source"))
.withEnv("SERVER_PORT", String.valueOf(serverPort))
.withExposedPorts(serverPort)
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
}
}

View File

@@ -1,88 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.support;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.OutputFrame;
public class LogMatcher implements Consumer<OutputFrame> {
private static Logger logger = LoggerFactory.getLogger(LogMatcher.class);
private List<Consumer<String>> listeners = new LinkedList<>();
public Callable<Boolean> verifies(Consumer<LogListener> consumer) {
LogListener logListener = new LogListener();
consumer.accept(logListener);
logListener.runnable.ifPresent(runnable -> runnable.run());
listeners.add(logListener);
return () -> logListener.matches().get();
}
@Override
public void accept(OutputFrame outputFrame) {
listeners.forEach(m -> m.accept(outputFrame.getUtf8String()));
}
public class LogListener implements Consumer<String> {
private AtomicBoolean matched = new AtomicBoolean();
private Optional<Runnable> runnable = Optional.empty();
private Pattern pattern;
@Override
public void accept(String s) {
logger.trace(this + "matching " + s.trim() + " using pattern " + pattern.pattern());
if (pattern.matcher(s.trim()).matches()) {
logger.debug(" MATCHED " + s.trim());
matched.set(true);
listeners.remove(this);
}
}
public LogListener contains(String string) {
return matchesRegex(".*" + string + ".*");
}
public LogListener endsWith(String string) {
return matchesRegex(".*" + string);
}
public LogListener matchesRegex(String regex) {
this.pattern = Pattern.compile(regex);
return this;
}
public LogListener when(Runnable runnable) {
this.runnable = Optional.of(runnable);
return this;
}
public AtomicBoolean matches() {
return matched;
}
}
}

View File

@@ -1,118 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.support;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import com.samskivert.mustache.Mustache;
import com.samskivert.mustache.Template;
import org.apache.shiro.util.Assert;
import org.springframework.core.io.Resource;
public final class TemplateProcessor {
private final Path outputDirectory;
private final Resource template;
private final Map<?, ?> templateProperties;
private TemplateProcessor(Builder builder) {
outputDirectory = builder.outputDirectory;
template = builder.template;
templateProperties = builder.templateProperties;
}
private static Map<?, ?> globalProperties;
public File processTemplate() {
try {
try (InputStreamReader resourcesTemplateReader = new InputStreamReader(
Objects.requireNonNull(template.getInputStream()))) {
Template resourceTemplate = Mustache.compiler().escapeHTML(false).compile(resourcesTemplateReader);
Path temporaryFile = outputDirectory.resolve(Paths.get(template.getFilename()));
if (Files.exists(temporaryFile)) {
Files.delete(temporaryFile);
}
Files.createFile(temporaryFile);
Files.write(temporaryFile,
resourceTemplate.execute(addGlobalProperties(templateProperties)).getBytes()).toFile();
temporaryFile.toFile().deleteOnExit();
return temporaryFile.toFile();
}
}
catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
private Map<?, ?> addGlobalProperties(Map<?, ?> templateProperties) {
Map<Object, Object> enriched = new HashMap<>();
globalProperties.forEach((key, value) -> enriched.put(key.toString(), value.toString()));
enriched.putAll(templateProperties);
return enriched;
}
public static TemplateProcessor.Builder builder() {
return TemplateProcessor.withGlobalProperties(Collections.EMPTY_MAP);
}
public static TemplateProcessor.Builder withGlobalProperties(Map<?, ?> globalProperties) {
TemplateProcessor.globalProperties = globalProperties;
return new TemplateProcessor.Builder();
}
public static class Builder {
private Path outputDirectory;
private Resource template;
private Map<?, ?> templateProperties;
public Builder withTemplateProperties(Map<?, ?> templateProperties) {
this.templateProperties = templateProperties;
return this;
}
public Builder withTemplate(Resource template) {
this.template = template;
return this;
}
public Builder withOutputDirectory(Path outputDirectory) {
this.outputDirectory = outputDirectory;
return this;
}
public TemplateProcessor create() {
Assert.notNull(this.outputDirectory, "outputDirectory is required");
Assert.notNull(this.template, "template is required");
return new TemplateProcessor(this);
}
}
}

View File

@@ -1,23 +0,0 @@
version: {{docker.compose.version}}
services:
kafka:
image: confluentinc/cp-kafka:5.5.1
hostname: kafka
ports:
- "{{kafkaBrokerPort}}:{{kafkaBrokerPort}}"
environment:
- KAFKA_ADVERTISED_LISTENERS=SHARED://{{hostAddress}}:{{kafkaBrokerPort}},LOCAL://kafka:9092
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SHARED:PLAINTEXT,LOCAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=LOCAL
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
expose:
- "2181"
environment:
- ZOOKEEPER_CLIENT_PORT=2181

View File

@@ -11,5 +11,5 @@
<logger name="org.testcontainers" level="INFO"/>
<logger name="com.github.dockerjava" level="WARN"/>
<logger name="org.springframework.cloud.stream.apps.integration.test" level="DEBUG"/>
<!-- <logger name="org.springframework.cloud.stream.app.test.integration" level="DEBUG"/>-->
</configuration>

View File

@@ -1,27 +0,0 @@
version: {{docker.compose.version}}
services:
http-source:
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
ports:
- "{{port}}:{{port}}"
environment:
- SERVER_PORT={{port}}
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=processor
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
http-request-processor:
image: springcloudstream/http-request-processor-kafka:{{stream.apps.version}}
environment:
- HTTP_REQUEST_URL_EXPRESSION='{{url}}'
- HTTP_REQUEST_HTTP_METHOD_EXPRESSION='POST'
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=processor
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-request-processor-tests
log-sink:
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-request-processor-tests

View File

@@ -1,24 +0,0 @@
version: {{docker.compose.version}}
services:
http-source:
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
ports:
- "{{port}}:{{port}}"
environment:
- SERVER_PORT={{port}}
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=jdbc
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
jdbc-sink:
image: springcloudstream/jdbc-sink-kafka:{{stream.apps.version}}
environment:
- JDBC_CONSUMER_COLUMNS=name,city:address.city,street:address.street
- JDBC_CONSUMER_TABLE_NAME=People
- SPRING_DATASOURCE_PASSWORD={{password}}
- SPRING_DATASOURCE_USERNAME={{user}}
- SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
- SPRING_DATASOURCE_URL={{jdbc.url}}
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=jdbc
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=jdbc-sink-tests

View File

@@ -1,19 +0,0 @@
version: {{docker.compose.version}}
services:
http-source:
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
ports:
- "{{port}}:{{port}}"
environment:
- SERVER_PORT={{port}}
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=mongodb
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
mongodb-sink:
image: springcloudstream/mongodb-sink-kafka:{{stream.apps.version}}
environment:
- MONGO_DB_CONSUMER_COLLECTION=test
- SPRING_DATA_MONGODB_URL={{mongodb.url}}
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=mongodb
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=mongodb-sink-tests

View File

@@ -1,21 +0,0 @@
version: {{docker.compose.version}}
services:
http-source:
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
ports:
- "{{port}}:{{port}}"
environment:
- SERVER_PORT={{port}}
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=tcp
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
tcp-sink:
image: springcloudstream/tcp-sink-kafka:{{stream.apps.version}}
environment:
- TCP_CONSUMER_HOST={{tcp.host}}
- TCP_PORT={{tcp.port}}
- TCP_CONSUMER_ENCODER=CRLF
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=tcp
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=tcp-sink-tests

View File

@@ -1,20 +0,0 @@
version: {{docker.compose.version}}
services:
geode-source:
image: springcloudstream/geode-source-kafka:{{stream.apps.version}}
environment:
- GEODE_POOL_CONNECT_TYPE=server
- GEODE_REGION_REGION_NAME={{geode.region}}
- GEODE_POOL_HOST_ADDRESSES={{geode.host-addresses}}
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
extra_hosts:
- geode:{{geodeHost}}
log-sink:
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=geode-source-tests

View File

@@ -1,18 +0,0 @@
version: {{docker.compose.version}}
services:
http-source:
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
ports:
- "{{port}}:{{port}}"
environment:
- SERVER_PORT={{port}}
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
log-sink:
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-source-tests

View File

@@ -1,33 +0,0 @@
version: {{docker.compose.version}}
services:
mysql:
image: mysql:5.7
command: --init-file /init.sql
volumes:
- {{init.sql}}:/init.sql
environment:
MYSQL_USER: root
MYSQL_ROOT_PASSWORD: secret
expose:
- 3306
jdbc-source:
image: springcloudstream/jdbc-source-kafka:{{stream.apps.version}}
depends_on:
- mysql
environment:
- JDBC_SUPPLIER_QUERY=SELECT * FROM People WHERE deleted='N'
- JDBC_SUPPLIER_UPDATE=UPDATE People SET deleted='Y' WHERE id=:id
- SPRING_DATASOURCE_PASSWORD=secret
- SPRING_DATASOURCE_USERNAME=root
- SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/test
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
log-sink:
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=jdbc-source-tests

View File

@@ -1,28 +0,0 @@
version: {{docker.compose.version}}
services:
s3-source:
image: springcloudstream/s3-source-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_FUNCTION_DEFINITION={{functionDefinition}}
- FILE_CONSUMER_MODE={{consumerMode}}
- S3_COMMON_ENDPOINT_URL={{s3.endpoint.url}}
- S3_COMMON_PATH_STYLE_ACCESS=true
- S3_SUPPLIER_REMOTE_DIR=bucket
- S3_SUPPLIER_LIST_ONLY={{listOnly}}
- CLOUD_AWS_STACK_AUTO=false
- CLOUD_AWS_CREDENTIALS_ACCESS_KEY=minio
- CLOUD_AWS_CREDENTIALS_SECRET_KEY=minio123
- CLOUD_AWS_REGION_STATIC=us-east-1
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
# For Task Launch Request
- TASK_LAUNCH_REQUEST_ARG_EXPRESSIONS=filename=payload
- TASK_LAUNCH_REQUEST_TASK_NAME=myTask
extra_hosts:
- minio:{{minioHost}}
log-sink:
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=s3-source-tests

View File

@@ -1,27 +0,0 @@
version: {{docker.compose.version}}
services:
sftp-source:
image: springcloudstream/sftp-source-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_FUNCTION_DEFINITION={{functionDefinition}}
- FILE_CONSUMER_MODE={{consumerMode}}
- SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS=true
- SFTP_SUPPLIER_REMOTE_DIR=/remote
- SFTP_SUPPLIER_FACTORY_USERNAME=user
- SFTP_SUPPLIER_FACTORY_PASSWORD=pass
- SFTP_SUPPLIER_FACTORY_HOST=sftp
- SFTP_SUPPLIER_FACTORY_PORT={{sftpPort}}
- SFTP_SUPPLIER_LIST_ONLY={{listOnly}}
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
# For Task Launch Request
- TASK_LAUNCH_REQUEST_ARG_EXPRESSIONS=filename=payload
- TASK_LAUNCH_REQUEST_TASK_NAME=myTask
extra_hosts:
- sftp:{{sftpHost}}
log-sink:
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=sftp-source-tests

View File

@@ -1,2 +0,0 @@
docker.compose.version='2.4'
stream.apps.version=3.0.0-SNAPSHOT

View File

@@ -1,14 +0,0 @@
version: {{docker.compose.version}}
services:
time-source:
image: springcloudstream/time-source-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
log-sink:
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
environment:
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS={{kafkaBootStrapServers}}
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=ticktock-tests