Refactor to use updated test framework from spring-cloud/stream-applications/stream-applications-test-support

This commit is contained in:
David Turanski
2020-10-19 18:16:38 -04:00
parent 8ec343dc61
commit 14d847b1b2
23 changed files with 708 additions and 616 deletions

View File

@@ -3,39 +3,39 @@
This contains integration tests for pre-packaged stream-applications for Docker using https://www.testcontainers.org/[TestContainers].
These are end-to-end integration tests running apps and required resources, using docker-compose.
The goal is to have an end-to-end integration test for each pre-packaged application.
We don't aim to test different configuration options, as this is the responsibility of the stream applications.
We don't aim to test all different configuration options, as this is the responsibility of the stream application and function components.
One of the major benefits is to verify the built Docker images run correctly, especially when we introduce global changes,
such as upgrading the base JDK image, or other pervasive changes.
such as upgrading the base JDK image, the maven plugins, or other pervasive changes.
== Test Strategy
Each test uses a docker-compose file to configure a simple pipeline including the source, sink, or processor app under test.
Usually, a single message is sufficient.
See https://github.com/spring-cloud/stream-applications/tree/master/applications/stream-applications-core/common/stream-applications-test-support[] for a full description.
The tests use following patterns:
=== Source
To test a source, we use a log sink and add a LogConsumer that returns a boolean value, eventually evaluating to `true`
when an expected regex match is detected. Then we create an event to trigger the source.
== Source
To test a source, we may require some application specific setup or event to trigger the source.
For example, the jdbc source needs some data in the database to which it is listening.
Then use an `outputPayloadVerifier` or `outputMessageVerifier` to verify the output.
=== Sink
To test a sink, we use the http source and post a message using WebClient.
== Sink
To test a sink, we need to publish a message to its input. Simply use the provided KafkaTemplate or RabbitTemplate beans.
Then we need to verify the result by checking the sink's external resource.
=== Processor
To test a processor, use an http source and a log sink.
== Processor
To test a processor we publish a message and use an `outputPayloadVerifier` or `outputMessageVerifier` to verify the output.
== Configuration
See link:src/test/java/org/springframework/cloud/stream/apps/integration/test/common/Configuration.java[Configuration] for configuration
options. These tests use Spring but not boot currently.
The most important setting is the image versions to test.
By default, we use `latest`.
To override it, set the System property, e.g.,
./mvnw clean test -Dspring.cloud.stream.applications.version=3.0.0-M3
Typically, it is convenient to run the required external resource in a separate container, since it must be accessed by localhost(to provide data to a source or
verify data created by a sink), and by the containerized applications.
=== Organization and Conventions
`processor`, `source`, and `sink` are leaf packages.
The docker-compose YAML corresponding each test are test resources, similarly organized.
So `.../source/JdbcSinkTests` corresponds to `source/jdbc-source-tests.yml`.
=== Templating
The docker-compose files are Mustache templates to allow configuration of runtime values, such as ports.
The Docker tag is required configuration, so the templates must always be processed.
The result is copied to a temporary file.

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.springframework.cloud.stream.apps</groupId>
@@ -19,9 +19,7 @@
<java-functions.version>1.0.0-SNAPSHOT</java-functions.version>
<stream-applications.version>3.0.0-SNAPSHOT</stream-applications.version>
<mariadb-client.version>2.6.2</mariadb-client.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
<testcontainers.version>1.15.0-rc2</testcontainers.version>
<jmustache.version>1.15</jmustache.version>
<test-containers.version>1.15.0-rc2</test-containers.version>
<maven-checkstyle-plugin.version>3.1.0</maven-checkstyle-plugin.version>
<disable.checks>false</disable.checks>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
@@ -44,6 +42,7 @@
<wiremock.version>2.27.1</wiremock.version>
<aws.version>1.11.415</aws.version>
<mysql-connector-java.version>8.0.16</mysql-connector-java.version>
<junit-platform-runner.version>1.6.2</junit-platform-runner.version>
</properties>
<dependencies>
@@ -80,22 +79,9 @@
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mariadb</artifactId>
<scope>test</scope>
<artifactId>rabbitmq</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
@@ -158,13 +144,22 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<version>${junit-platform-runner.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1C</forkCount>
<parallel>all</parallel>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
@@ -239,7 +234,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>${testcontainers.version}</version>
<version>${test-containers.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

View File

@@ -1,66 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test;
import java.time.Duration;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap;
public class TikTokBaselineTests extends KafkaStreamIntegrationTestSupport {
// "MM/dd/yy HH:mm:ss";
private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
private final static LogMatcher logMatcher = new LogMatcher();
@Container
static GenericContainer timeSource = new GenericContainer(
DockerImageName.parse("springcloudstream/time-source-kafka:3.0.0-SNAPSHOT"))
.withNetwork(kafka.getNetwork())
.withEnv(fluentMap().withEntry("SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION", "TikTok")
.withEntry("SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS",
kafka.getNetworkAliases().get(0) + ":9092"))
.dependsOn(kafka);
@Container
static GenericContainer logSink = new GenericContainer(
DockerImageName.parse("springcloudstream/log-sink-kafka:3.0.0-SNAPSHOT"))
.withNetwork(kafka.getNetwork())
.withEnv(fluentMap().withEntry("SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION", "TikTok")
.withEntry("SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS",
kafka.getNetworkAliases().get(0) + ":9092")
.withEntry("SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP", "TikTok"))
.withLogConsumer(logMatcher)
.withLogConsumer(appLog("log-sink"))
.dependsOn(kafka);
@Test
void tiktok() {
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started LogSink")));
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.matchesRegex(pattern.pattern())));
}
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.common;
import java.time.Duration;
public abstract class Configuration {
public static String VERSION;
public static final Duration DEFAULT_DURATION = Duration.ofMinutes(1);
private static final String SPRING_CLOUD_STREAM_APPLICATIONS_VERSION = "spring.cloud.stream.applications.version";
static {
VERSION = System.getProperty(SPRING_CLOUD_STREAM_APPLICATIONS_VERSION, "latest");
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.processor;
import java.net.InetAddress;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageHeaders;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaHttpRequestProcessorTests extends KafkaStreamApplicationIntegrationTestSupport {
private static MockWebServer server = new MockWebServer();
private static int serverPort = findAvailablePort();
@Autowired
private KafkaTemplate kafkaTemplate;
@Container
private static StreamAppContainer processor = prepackagedKafkaContainerFor("http-request-processor", VERSION)
.withLogConsumer(appLog("http-request-processor"))
.withEnv("HTTP_REQUEST_URL_EXPRESSION", "'http://" + localHostAddress() + ":" + serverPort + "'")
.withEnv("HTTP_REQUEST_HTTP_METHOD_EXPRESSION", "'POST'");
@BeforeAll
static void startServer() throws Exception {
server.start(InetAddress.getLocalHost(), serverPort);
}
@Test
void get() {
server.setDispatcher(new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest recordedRequest) {
return new MockResponse().setHeader(HttpHeaders.CONTENT_TYPE,
MediaType.APPLICATION_JSON_VALUE)
.setBody("{\"response\":\"" + recordedRequest.getBody().readUtf8() + "\"}")
.setResponseCode(HttpStatus.OK.value());
}
});
kafkaTemplate.send(processor.getInputDestination(), "ping");
await().atMost(DEFAULT_DURATION)
.until(verifyOutputMessage(message -> message.getPayload().equals("{\"response\":\"ping\"}")
&& message.getHeaders().get(MessageHeaders.CONTENT_TYPE)
.equals(MediaType.APPLICATION_JSON_VALUE)));
}
}

View File

@@ -14,38 +14,35 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.sink;
import java.time.Duration;
package org.springframework.cloud.stream.apps.integration.test.kafka.sink;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.http.MediaType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.kafka.core.KafkaTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class JdbcSinkTests extends KafkaStreamIntegrationTestSupport {
private static int serverPort = findAvailablePort();
public class KafkaJdbcSinkTests extends KafkaStreamApplicationIntegrationTestSupport {
private static JdbcTemplate jdbcTemplate;
private static WebClient webClient = WebClient.builder().build();
@Autowired
private KafkaTemplate kafkaTemplate;
@Container
private static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7"))
@@ -53,27 +50,28 @@ public class JdbcSinkTests extends KafkaStreamIntegrationTestSupport {
.withPassword("secret")
.withExposedPorts(3306)
.withNetwork(kafka.getNetwork())
.withNetworkAliases("mysql-for-sink")
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withLogConsumer(appLog("mysql-for-sink"))
.withCommand("--init-file", "/init.sql");
@Container
private static StreamApps streamApps = kafkaStreamApps(JdbcSinkTests.class.getSimpleName(), kafka)
.withSourceContainer(defaultKafkaContainerFor("http-source")
.withEnv("SERVER_PORT", String.valueOf(serverPort))
.withExposedPorts(serverPort)
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))))
.withSinkContainer(defaultKafkaContainerFor("jdbc-sink")
.withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
.withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mysql://" + mySQL.getNetworkAliases().get(0) + ":3306/test"))
.build();
static StreamAppContainer sink = prepackagedKafkaContainerFor("jdbc-sink", VERSION)
.dependsOn(mySQL)
.withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
.withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
// .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
// .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_JDBC", "DEBUG")
// .withEnv("LOGGING_LEVEL_ORG_MARIADB_JDBC", "DEBUG")
// .log()
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mariadb://mysql-for-sink:3306/test");
@BeforeAll
static void startStreamApps() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName("org.mariadb.jdbc.Driver");
dataSource.setUsername(mySQL.getUsername());
@@ -81,25 +79,21 @@ public class JdbcSinkTests extends KafkaStreamIntegrationTestSupport {
dataSource.setJdbcUrl("jdbc:mysql://localhost:" + mySQL.getMappedPort(3306) + "/test");
jdbcTemplate = new JdbcTemplate(dataSource);
jdbcTemplate.execute("DELETE FROM People");
await().atMost(DEFAULT_DURATION)
.until(() -> jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)
.intValue() == 0);
sink.start();
}
@Test
void postData() {
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\":\"Narrow Alley\"}}";
ClientResponse response = webClient
.post()
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort))
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(json), String.class)
.exchange()
.block();
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
void test() throws JsonProcessingException {
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\",\"street\":\"Narrow Alley\"}}";
kafkaTemplate.send(sink.getInputDestination(), json);
await().atMost(Duration.ofSeconds(30))
.untilAsserted(
() -> assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) from People",
Integer.class))
.isOne());
await().atMost(DEFAULT_DURATION)
.until(
() -> jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)
.intValue() == 1);
assertThat(jdbcTemplate.queryForObject("SELECT name from People",
String.class)).isEqualTo("My Name");
}

View File

@@ -14,38 +14,38 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.sink;
package org.springframework.cloud.stream.apps.integration.test.kafka.sink;
import java.time.Duration;
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.kafka.core.KafkaTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class MongoDBSinkTests extends KafkaStreamIntegrationTestSupport {
private static int serverPort = findAvailablePort();
public class KafkaMongoDBSinkTests extends KafkaStreamApplicationIntegrationTestSupport {
private static MongoTemplate mongoTemplate;
private static WebClient webClient = WebClient.builder().build();
@Autowired
private KafkaTemplate kafkaTemplate;
@Container
private static MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"))
@@ -53,16 +53,14 @@ public class MongoDBSinkTests extends KafkaStreamIntegrationTestSupport {
.withStartupTimeout(Duration.ofMinutes(2));
private static String mongoConnectionString() {
return String.format("mongodb://%s:%s/%s", localHostAddress(), mongoDBContainer.getMappedPort(27017), "test");
return String.format("mongodb://%s:%s/%s", localHostAddress(),
mongoDBContainer.getMappedPort(27017), "test");
}
@Container
private StreamApps streamApps = kafkaStreamApps(MongoDBSinkTests.class.getSimpleName(), kafka)
.withSourceContainer(httpSource(serverPort))
.withSinkContainer(defaultKafkaContainerFor("mongodb-sink")
.withEnv("MONGO_DB_CONSUMER_COLLECTION", "test")
.withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString()))
.build();
private StreamAppContainer sink = prepackagedKafkaContainerFor("mongodb-sink", VERSION)
.withEnv("MONGO_DB_CONSUMER_COLLECTION", "test")
.withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString());
@BeforeAll
static void buildMongoTemplate() {
@@ -73,16 +71,17 @@ public class MongoDBSinkTests extends KafkaStreamIntegrationTestSupport {
@Test
void postData() {
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\": \"Narrow Alley\"}}";
ClientResponse response = webClient
.post()
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort))
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(json), String.class)
.exchange()
.block(Duration.ofSeconds(30));
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
List<Document> docs = mongoTemplate.findAll(Document.class, "test");
assertThat(docs).allMatch(document -> document.get("name", String.class).equals("My Name"));
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\":\"Narrow Alley\"}}";
kafkaTemplate.send(sink.getInputDestination(), json);
await().atMost(DEFAULT_DURATION).untilAsserted(() -> {
List<Document> docs = mongoTemplate.findAll(Document.class, "test");
assertThat(docs).allMatch(document -> document.get("name", String.class).equals("My Name"));
});
}
@AfterAll
static void cleanUp() {
mongoDBContainer.close();
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.sink;
package org.springframework.cloud.stream.apps.integration.test.kafka.sink;
import java.io.BufferedReader;
import java.io.IOException;
@@ -28,21 +28,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.kafka.core.KafkaTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class TcpSinkTests extends KafkaStreamIntegrationTestSupport {
private static final int port = findAvailablePort();
public class KafkaTcpSinkTests extends KafkaStreamApplicationIntegrationTestSupport {
private static final int tcpPort = findAvailablePort();
@@ -50,16 +46,14 @@ public class TcpSinkTests extends KafkaStreamIntegrationTestSupport {
private static final AtomicBoolean socketReady = new AtomicBoolean();
private static WebClient webClient = WebClient.builder().build();
@Autowired
private KafkaTemplate kafkaTemplate;
@Container
private static StreamApps streamApps = kafkaStreamApps(TcpSinkTests.class.getSimpleName(), kafka)
.withSourceContainer(httpSource(port))
.withSinkContainer(defaultKafkaContainerFor("tcp-sink")
.withEnv("TCP_CONSUMER_HOST", localHostAddress())
.withEnv("TCP_PORT", String.valueOf(tcpPort))
.withEnv("TCP_CONSUMER_ENCODER", "CRLF"))
.build();
private static StreamAppContainer sink = prepackagedKafkaContainerFor("tcp-sink", VERSION)
.withEnv("TCP_CONSUMER_HOST", localHostAddress())
.withEnv("TCP_PORT", String.valueOf(tcpPort))
.withEnv("TCP_CONSUMER_ENCODER", "CRLF");
@BeforeAll
static void startTcpServer() {
@@ -68,25 +62,20 @@ public class TcpSinkTests extends KafkaStreamIntegrationTestSupport {
socket = new ServerSocket(tcpPort, 50, InetAddress.getLocalHost()).accept();
socketReady.set(true);
}
catch (IOException exception) {
exception.printStackTrace();
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}).start();
}
@Test
void postData() throws IOException {
// Sink will not connect until it receives a message.
String text = "Hello, world!";
ClientResponse response = webClient
.post()
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(port))
.contentType(MediaType.TEXT_PLAIN)
.body(Mono.just(text), String.class)
.exchange()
.block();
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
await().atMost(Duration.ofSeconds(10)).untilTrue(socketReady);
kafkaTemplate.send(sink.getInputDestination(), text);
await().atMost(DEFAULT_DURATION).untilTrue(socketReady);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
await().atMost(Duration.ofSeconds(10)).until(() -> reader.readLine().equals("Hello, world!"));
await().atMost(Duration.ofSeconds(10)).until(() -> reader.readLine().equals(text));
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source;
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
import java.time.Duration;
import java.util.function.Consumer;
@@ -36,17 +36,14 @@ import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.fn.test.support.geode.GeodeContainer;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class GeodeSourceTests extends KafkaStreamIntegrationTestSupport {
private static final LogMatcher logMatcher = new LogMatcher();
private static final LogMatcher geodeLogMatcher = new LogMatcher();
public class KafkaGeodeSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
private static final int locatorPort = findAvailablePort();
@@ -56,6 +53,8 @@ public class GeodeSourceTests extends KafkaStreamIntegrationTestSupport {
private static ClientCache clientCache;
private static LogMatcher logMatcher = LogMatcher.contains("Started GeodeSource");
@Container
private static final GeodeContainer geode = (GeodeContainer) new GeodeContainer(new ImageFromDockerfile()
.withFileFromClasspath("Dockerfile", "geode/Dockerfile")
@@ -64,24 +63,20 @@ public class GeodeSourceTests extends KafkaStreamIntegrationTestSupport {
locatorPort, cacheServerPort)
.withCreateContainerCmdModifier(
(Consumer<CreateContainerCmd>) createContainerCmd -> createContainerCmd
.withName("apache_geode")
.withHostName("geode").withHostConfig(new HostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(cacheServerPort),
new ExposedPort(cacheServerPort)),
new PortBinding(Ports.Binding.bindPort(locatorPort),
new ExposedPort(locatorPort)))))
.withCommand("tail", "-f", "/dev/null")
.withStartupTimeout(Duration.ofMinutes(2));
.withStartupTimeout(DEFAULT_DURATION.multipliedBy(2));
@Container
private final StreamApps streamApps = kafkaStreamApps(this.getClass().getSimpleName(), kafka)
.withSourceContainer(defaultKafkaContainerFor("geode-source")
.withEnv("GEODE_POOL_CONNECT_TYPE", "server")
.withEnv("GEODE_REGION_REGION_NAME", "myRegion")
.withEnv("GEODE_POOL_HOST_ADDRESSES", localHostAddress() + ":" + cacheServerPort)
.withLogConsumer(geodeLogMatcher))
.withSinkContainer(defaultKafkaContainerFor("log-sink")
.withLogConsumer(logMatcher))
.build();
private static final StreamAppContainer geodeSource = prepackagedKafkaContainerFor("geode-source", VERSION)
.withEnv("GEODE_POOL_CONNECT_TYPE", "server")
.withEnv("GEODE_REGION_REGION_NAME", "myRegion")
.withEnv("GEODE_POOL_HOST_ADDRESSES", localHostAddress() + ":" + cacheServerPort)
.withLogConsumer(logMatcher);
@BeforeAll
static void init() {
@@ -98,16 +93,15 @@ public class GeodeSourceTests extends KafkaStreamIntegrationTestSupport {
clientRegion = clientCache
.createClientRegionFactory(ClientRegionShortcut.PROXY)
.create("myRegion");
geodeSource.start();
}
@Test
void test() {
await().atMost(Duration.ofMinutes(2))
.until(geodeLogMatcher.verifies(log -> log.contains("Started GeodeSource")));
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(log -> log
.when(() -> clientRegion.put("hello", "world"))
.contains("world")));
await().atMost(Duration.ofMinutes(2)).until(logMatcher.matches());
clientRegion.put("hello", "world");
await().atMost(DEFAULT_DURATION)
.until(verifyOutputPayload((String s) -> s.contains("world")));
}
@AfterAll

View File

@@ -0,0 +1,101 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaHttpSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
@Autowired
TestTopicListener testTopicListener;
private static int serverPort = findAvailablePort();
private static WebClient webClient = WebClient.builder().build();
@Container
private final StreamAppContainer source = prepackagedKafkaContainerFor("http-source", VERSION)
.withEnv("SERVER_PORT", String.valueOf(serverPort))
.withExposedPorts(serverPort)
.waitingFor(Wait.forListeningPort().withStartupTimeout(DEFAULT_DURATION));
@AfterEach
void reset() {
testTopicListener.clearOutputVerifiers();
}
@Test
void plaintext() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<HttpStatus> httpStatus = new AtomicReference<>();
webClient
.post()
.uri("http://localhost:" + source.getMappedPort(serverPort))
.contentType(MediaType.TEXT_PLAIN)
.body(Mono.just("Hello"), String.class)
.exchange()
.subscribe(r -> {
httpStatus.set(r.statusCode());
countDownLatch.countDown();
});
countDownLatch.await(30, TimeUnit.SECONDS);
assertThat(httpStatus.get().is2xxSuccessful()).isTrue();
await().atMost(DEFAULT_DURATION)
.until(verifyOutputPayload(s -> s.equals("Hello")));
}
@Test
void json() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
webClient
.post()
.uri("http://localhost:" + source.getMappedPort(serverPort))
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just("{\"Hello\":\"world\"}"), String.class)
.exchange()
.subscribe(r -> {
countDownLatch.countDown();
assertThat(r.statusCode().is2xxSuccessful()).isTrue();
});
countDownLatch.await(30, TimeUnit.SECONDS);
await().atMost(DEFAULT_DURATION)
.until(verifyOutputPayload(s -> s.equals("{\"Hello\":\"world\"}")));
}
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaJdbcSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
@Container
public static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7"))
.withUsername("test")
.withPassword("secret")
.withExposedPorts(3306)
.withNetwork(kafka.getNetwork())
.withNetworkAliases("mysql-for-source")
.withLogConsumer(appLog("mysql-for-source"))
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withCommand("--init-file", "/init.sql");
private static final StreamAppContainer source = prepackagedKafkaContainerFor("jdbc-source", VERSION)
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
.withEnv("SPRING_DATASOURCE_URL", "jdbc:mariadb://mysql-for-source:3306/test");
@BeforeAll
static void startSource() {
await().atMost(DEFAULT_DURATION).until(() -> mySQL.isRunning());
source.start();
}
@Test
void test() {
await().atMost(DEFAULT_DURATION)
.until(verifyOutputPayload((String s) -> s.contains("Bart Simpson")));
}
}

View File

@@ -14,9 +14,8 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source;
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
@@ -38,24 +37,27 @@ import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class S3SourceTests extends KafkaStreamIntegrationTestSupport {
public class KafkaS3SourceTests extends KafkaStreamApplicationIntegrationTestSupport {
private static AmazonS3 s3Client;
private static LogMatcher logMatcher = new LogMatcher();
@Autowired
TestTopicListener testTopicListener;
@Container
private static final GenericContainer minio = new GenericContainer(
DockerImageName.parse("minio/minio:RELEASE.2020-09-05T07-14-49Z"))
DockerImageName.parse("minio/minio:RELEASE.2020-10-18T21-54-12Z"))
.withExposedPorts(9000)
.withEnv("MINIO_ACCESS_KEY", "minio")
.withEnv("MINIO_SECRET_KEY", "minio123")
@@ -82,30 +84,27 @@ public class S3SourceTests extends KafkaStreamIntegrationTestSupport {
}
private StreamApps streamApps = kafkaStreamApps(S3SourceTests.class.getSimpleName(), kafka)
.withSourceContainer(defaultKafkaContainerFor("s3-source")
.withEnv("S3_SUPPLIER_REMOTE_DIR", "bucket")
.withEnv("S3_COMMON_ENDPOINT_URL", "http://" + localHostAddress() + ":" + minio.getMappedPort(9000))
.withEnv("S3_COMMON_PATH_STYLE_ACCESS", "true")
.withEnv("CLOUD_AWS_STACK_AUTO", "false")
.withEnv("CLOUD_AWS_CREDENTIALS_ACCESS_KEY", "minio")
.withEnv("CLOUD_AWS_CREDENTIALS_SECRET_KEY", "minio123")
.withEnv("CLOUD_AWS_REGION_STATIC", "us-east-1")
.withLogConsumer(logMatcher))
.withSinkContainer(defaultKafkaContainerFor("log-sink").withLogConsumer(logMatcher))
.build();
private StreamAppContainer source = prepackagedKafkaContainerFor("s3-source", VERSION)
.withEnv("S3_SUPPLIER_REMOTE_DIR", "bucket")
.withEnv("S3_COMMON_ENDPOINT_URL", "http://" + localHostAddress() + ":" + minio.getMappedPort(9000))
.withEnv("S3_COMMON_PATH_STYLE_ACCESS", "true")
.withEnv("CLOUD_AWS_STACK_AUTO", "false")
.withEnv("CLOUD_AWS_CREDENTIALS_ACCESS_KEY", "minio")
.withEnv("CLOUD_AWS_CREDENTIALS_SECRET_KEY", "minio123")
.withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
.withEnv("CLOUD_AWS_REGION_STATIC", "us-east-1").log();
//
@Test
void testLines() {
startContainer(
fluentMap().withEntry("FILE_CONSUMER_MODE", "lines"));
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
await().atMost(DEFAULT_DURATION).until(verifyOutputPayload((String s) -> s.contains("Bart Simpson")));
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started S3Source")));
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.when(() -> {
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
}).contains("Bart Simpson")));
}
@Test
@@ -117,13 +116,11 @@ public class S3SourceTests extends KafkaStreamIntegrationTestSupport {
.withEntry("TASK_LAUNCH_REQUEST_TASK_NAME", "myTask")
.withEntry("FILE_CONSUMER_MODE", "ref"));
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started S3Source")));
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.when(() -> {
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
}).endsWith(
"\\{\"args\":\\[\"filename=/tmp/s3-supplier/test\"\\],\"deploymentProps\":\\{\\},\"name\":\"myTask\"\\}")));
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
await().atMost(DEFAULT_DURATION).until(verifyOutputPayload(s -> s.equals(
"{\"args\":[\"filename=/tmp/s3-supplier/test\"],\"deploymentProps\":{},\"name\":\"myTask\"}")));
}
@Test
@@ -134,17 +131,16 @@ public class S3SourceTests extends KafkaStreamIntegrationTestSupport {
.withEntry("FILE_CONSUMER_MODE", "ref")
.withEntry("S3_SUPPLIER_LIST_ONLY", "true"));
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started S3Source")));
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.when(() -> {
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
}).contains("\"bucketName\":\"bucket\",\"key\":\"test\"")));
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
await().atMost(DEFAULT_DURATION)
.until(verifyOutputPayload((String s) -> s.contains("\"bucketName\":\"bucket\",\"key\":\"test\"")));
}
private void startContainer(Map<String, String> environment) {
streamApps.sourceContainer().withEnv(environment);
streamApps.start();
source.withEnv(environment);
source.start();
}
@AfterEach
@@ -153,6 +149,7 @@ public class S3SourceTests extends KafkaStreamIntegrationTestSupport {
s3Client.deleteObject("bucket", "test");
s3Client.deleteBucket("bucket");
}
streamApps.stop();
source.stop();
testTopicListener.clearOutputVerifiers();
}
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaSftpSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
@Container
private static final GenericContainer sftp = new GenericContainer(DockerImageName.parse("atmoz/sftp"))
.withExposedPorts(22)
.withCommand("user:pass:::remote")
.withClasspathResourceMapping("sftp", "/home/user/remote", BindMode.READ_ONLY)
.withStartupTimeout(DEFAULT_DURATION);
private StreamAppContainer source = prepackagedKafkaContainerFor("sftp-source", VERSION)
.withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
.withEnv("SFTP_SUPPLIER_REMOTE_DIR", "/remote")
.withEnv("SFTP_SUPPLIER_FACTORY_USERNAME", "user")
.withEnv("SFTP_SUPPLIER_FACTORY_PASSWORD", "pass")
.withEnv("SFTP_SUPPLIER_FACTORY_PORT", String.valueOf(sftp.getMappedPort(22)))
.withEnv("SFTP_SUPPLIER_FACTORY_HOST", localHostAddress());
// TODO: This fixture supports additional tests with different modes, etc.
@Test
void test() {
startContainer(Collections.singletonMap("FILE_CONSUMER_MODE", "ref"));
await().atMost(DEFAULT_DURATION)
.until(verifyOutputPayload((String s) -> s.equals("\"/tmp/sftp-supplier/data.txt\"")));
}
private void startContainer(Map<String, String> environment) {
source.withEnv(environment);
source.start();
}
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@Testcontainers
public class KafkaTimeSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
// "MM/dd/yy HH:mm:ss";
private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
static LogMatcher logMatcher = LogMatcher.contains("Started TimeSource");
@Container
static StreamAppContainer timeSource = prepackagedKafkaContainerFor("time-source", VERSION)
.withLogConsumer(logMatcher);
@Test
void test() {
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
await().atMost(DEFAULT_DURATION).until(verifyOutputPayload((String s) -> pattern.matcher(s).matches()));
}
}

View File

@@ -14,9 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source;
import java.time.Duration;
package org.springframework.cloud.stream.apps.integration.test.kafka.stream;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
@@ -26,14 +24,17 @@ import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class JdbcSourceTests extends KafkaStreamIntegrationTestSupport {
public class KafkaJdbcLogStreamTests extends KafkaStreamApplicationIntegrationTestSupport {
private static LogMatcher logMatcher = new LogMatcher();
private static LogMatcher logMatcher = LogMatcher.contains("Bart Simpson");
@Container
public static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7"))
@@ -41,25 +42,26 @@ public class JdbcSourceTests extends KafkaStreamIntegrationTestSupport {
.withPassword("secret")
.withExposedPorts(3306)
.withNetwork(kafka.getNetwork())
.withNetworkAliases("mysql-for-stream")
.withLogConsumer(appLog("mySQL"))
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withCommand("--init-file", "/init.sql");
@Container
private static final StreamApps streamApps = kafkaStreamApps(JdbcSourceTests.class.getSimpleName(), kafka)
.withSourceContainer(defaultKafkaContainerFor("jdbc-source")
private static final StreamApps streamApp = kafkaStreamApps(KafkaJdbcLogStreamTests.class.getSimpleName(), kafka)
.withSourceContainer(prepackagedKafkaContainerFor("jdbc-source", VERSION)
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mysql://" + mySQL.getNetworkAliases().get(0) + ":3306/test"))
.withSinkContainer(defaultKafkaContainerFor("log-sink")
.withLogConsumer(logMatcher))
"jdbc:mariadb://mysql-for-stream:3306/test"))
.withSinkContainer(prepackagedKafkaContainerFor("log-sink", VERSION).withLogConsumer(logMatcher))
.build();
@Test
void test() {
await().atMost(Duration.ofSeconds(30)).until(logMatcher.verifies(log -> log.contains("Bart Simpson")));
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
}
}

View File

@@ -14,38 +14,33 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test;
import java.time.Duration;
import java.util.regex.Pattern;
package org.springframework.cloud.stream.apps.integration.test.kafka.stream;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class TikTokTests extends KafkaStreamIntegrationTestSupport {
// "MM/dd/yy HH:mm:ss";
private final static Pattern DATE_PATTERN = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
public class KafkaTikTokTests extends KafkaStreamApplicationIntegrationTestSupport {
private final static LogMatcher logMatcher = new LogMatcher();
private static LogMatcher logMatcher = LogMatcher.matchesRegex(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}")
.times(3);
@Container
static StreamApps streamApps = kafkaStreamApps("tikTok", kafka)
.withSourceContainer(defaultKafkaContainerFor("time-source"))
.withSinkContainer(defaultKafkaContainerFor("log-sink")
.withLogConsumer(logMatcher))
private static final StreamApps streamApp = kafkaStreamApps(KafkaTikTokTests.class.getSimpleName(), kafka)
.withSourceContainer(prepackagedKafkaContainerFor("time-source", VERSION))
.withSinkContainer(prepackagedKafkaContainerFor("log-sink", VERSION).withLogConsumer(logMatcher))
.build();
@Test
void tiktok() {
await().atMost(Duration.ofMinutes(2)).until(logMatcher.verifies(log -> log.contains("Started LogSink")));
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(log -> log.matchesRegex(DATE_PATTERN.pattern())));
void test() {
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
}
}

View File

@@ -1,96 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.processor;
import java.net.InetAddress;
import java.time.Duration;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class HttpRequestProcessorTests extends KafkaStreamIntegrationTestSupport {
private static MockWebServer server = new MockWebServer();
private static WebClient webClient = WebClient.builder().build();
private static LogMatcher logMatcher = new LogMatcher();
private static int serverPort = findAvailablePort();
private static int sourcePort = findAvailablePort();
@Container
private static final StreamApps streamApps = kafkaStreamApps(
HttpRequestProcessorTests.class.getSimpleName(), kafka)
.withSourceContainer(httpSource(sourcePort))
.withProcessorContainer(defaultKafkaContainerFor("http-request-processor")
.withEnv("HTTP_REQUEST_URL_EXPRESSION",
"'http://" + localHostAddress() + ":" + serverPort + "'")
.withEnv("HTTP_REQUEST_HTTP_METHOD_EXPRESSION", "'POST'"))
.withSinkContainer(
defaultKafkaContainerFor("log-sink").withLogConsumer(logMatcher))
.build();
@BeforeAll
static void startServer() throws Exception {
server.start(InetAddress.getLocalHost(), serverPort);
}
@Test
void get() {
server.setDispatcher(new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest recordedRequest) {
return new MockResponse().setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.setBody("{\"response\":\"" + recordedRequest.getBody().readUtf8() + "\"}")
.setResponseCode(HttpStatus.OK.value());
}
});
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(log -> log.when(() -> {
ClientResponse response = webClient
.post()
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(sourcePort))
.contentType(MediaType.TEXT_PLAIN)
.body(Mono.just("ping"), String.class)
.exchange()
.block();
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
}).matchesRegex(".*\\{\"response\":\"ping\"\\}")));
}
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.rabbitmq.source;
import java.time.Duration;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@Testcontainers
public class RabbitMQTimeSourceTests extends RabbitMQStreamApplicationIntegrationTestSupport {
// "MM/dd/yy HH:mm:ss";
private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
static LogMatcher logMatcher = LogMatcher.contains("Started TimeSource");
@Container
static StreamAppContainer timeSource = prepackagedRabbitMQContainerFor("time-source", VERSION)
.withLogConsumer(logMatcher);
@Test
void test() {
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
await().atMost(DEFAULT_DURATION).pollInterval(Duration.ofSeconds(1)).until(verifyOutputPayload((String s) -> pattern.matcher(s).matches()));
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.rabbitmq.stream;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamApps.rabbitMQStreamApps;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class RabbitMQTikTokTests extends RabbitMQStreamApplicationIntegrationTestSupport {
private static LogMatcher logMatcher = LogMatcher.matchesRegex(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}")
.times(3);
@Container
private static final StreamApps streamApp = rabbitMQStreamApps(RabbitMQTikTokTests.class.getSimpleName(), rabbitmq)
.withSourceContainer(prepackagedRabbitMQContainerFor("time-source", VERSION))
.withSinkContainer(prepackagedRabbitMQContainerFor("log-sink", VERSION)
.withLogConsumer(logMatcher))
.build();
@Test
void test() {
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
}
}

View File

@@ -1,80 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import reactor.core.publisher.Mono;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class HttpSourceTests extends KafkaStreamIntegrationTestSupport {
private static int serverPort = findAvailablePort();
private static WebClient webClient = WebClient.builder().build();
private static LogMatcher logMatcher = new LogMatcher();
@Container
private static final StreamApps streamApps = kafkaStreamApps(HttpSourceTests.class.getSimpleName(), kafka)
.withSourceContainer(httpSource(serverPort))
.withSinkContainer(defaultKafkaContainerFor("log-sink").withLogConsumer(logMatcher))
.build();
@Test
void plaintext() {
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(log -> log.when(() -> {
ClientResponse response = webClient
.post()
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort))
.contentType(MediaType.TEXT_PLAIN)
.body(Mono.just("Hello"), String.class)
.exchange()
.block();
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
}).endsWith("Hello")));
}
@Test
void json() {
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(log -> log.when(() -> {
ClientResponse response = webClient
.post()
.uri("http://localhost:" + streamApps.sourceContainer().getMappedPort(serverPort))
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just("{\"Hello\":\"world\"}"), String.class)
.exchange()
.block();
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
}).matchesRegex(".*\\{\"Hello\":\"world\"\\}")));
}
}

View File

@@ -1,78 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source;
import java.time.Duration;
import java.util.Map;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.apps.integration.test.support.KafkaStreamIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
public class SftpSourceTests extends KafkaStreamIntegrationTestSupport {
private static LogMatcher logMatcher = new LogMatcher();
@Container
private static final GenericContainer sftp = new GenericContainer(DockerImageName.parse("atmoz/sftp"))
.withExposedPorts(22)
.withCommand("user:pass:::remote")
.withClasspathResourceMapping("sftp", "/home/user/remote", BindMode.READ_ONLY)
.withStartupTimeout(Duration.ofMinutes(1));
private StreamApps streamApps = kafkaStreamApps(SftpSourceTests.class.getSimpleName(), kafka)
.withSourceContainer(defaultKafkaContainerFor("sftp-source")
.withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
.withEnv("SFTP_SUPPLIER_REMOTE_DIR", "/remote")
.withEnv("SFTP_SUPPLIER_FACTORY_USERNAME", "user")
.withEnv("SFTP_SUPPLIER_FACTORY_PASSWORD", "pass")
.withEnv("SFTP_SUPPLIER_FACTORY_PORT", String.valueOf(sftp.getMappedPort(22)))
.withEnv("SFTP_SUPPLIER_FACTORY_HOST", localHostAddress()))
.withSinkContainer(defaultKafkaContainerFor("log-sink").withLogConsumer(logMatcher))
.build();
// TODO: This fixture supports additional tests with different modes, etc.
@Test
void test() {
startContainer(
fluentMap().withEntry("FILE_CONSUMER_MODE", "ref"));
await().atMost(Duration.ofSeconds(30))
.until(logMatcher.verifies(logListener -> logListener.endsWith("\"/tmp/sftp-supplier/data.txt\"")));
}
private void startContainer(Map<String, String> environment) {
streamApps.sourceContainer().withEnv(environment);
streamApps.start();
}
@AfterEach
void stop() {
streamApps.stop();
}
}

View File

@@ -1,47 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.support;
import java.time.Duration;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.kafka.AbstractKafkaStreamApplicationIntegrationTests;
public abstract class KafkaStreamIntegrationTestSupport extends AbstractKafkaStreamApplicationIntegrationTests {
protected static final String VERSION = "3.0.0-SNAPSHOT";
protected static final String DOCKER_ORG = "springcloudstream";
private static DockerImageName defaultKafkaImageFor(String appName) {
return DockerImageName.parse(DOCKER_ORG + "/" + appName + "-kafka:" + VERSION);
}
protected static GenericContainer defaultKafkaContainerFor(String appName) {
return new GenericContainer(defaultKafkaImageFor(appName));
}
protected static GenericContainer httpSource(int serverPort) {
return new GenericContainer(defaultKafkaImageFor("http-source"))
.withEnv("SERVER_PORT", String.valueOf(serverPort))
.withExposedPorts(serverPort)
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
}
}

View File

@@ -11,5 +11,6 @@
<logger name="org.testcontainers" level="INFO"/>
<logger name="com.github.dockerjava" level="WARN"/>
<!-- <logger name="org.springframework.kafka" level="INFO"/>-->
<!-- <logger name="org.springframework.cloud.stream.app.test.integration" level="DEBUG"/>-->
</configuration>