Tests Pass except RabbitMQGeodeSourceTests

This commit is contained in:
David Turanski
2020-11-01 16:59:43 -05:00
parent 719d6eb226
commit 92d37334f6
37 changed files with 1013 additions and 300 deletions

View File

@@ -119,6 +119,12 @@
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-geode</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>

View File

@@ -27,7 +27,7 @@ public abstract class Configuration {
private static final String SPRING_CLOUD_STREAM_APPLICATIONS_VERSION = "spring.cloud.stream.applications.version";
static {
VERSION = System.getProperty(SPRING_CLOUD_STREAM_APPLICATIONS_VERSION, "latest");
VERSION = System.getProperty(SPRING_CLOUD_STREAM_APPLICATIONS_VERSION, "3.0.0-SNAPSHOT");
}
}

View File

@@ -14,48 +14,59 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.processor;
package org.springframework.cloud.stream.apps.integration.test.processor.httprequest;
import java.io.IOException;
import java.net.InetAddress;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageHeaders;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaHttpRequestProcessorTests extends KafkaStreamApplicationIntegrationTestSupport {
private static MockWebServer server = new MockWebServer();
abstract class HttpRequestProcessorTests {
private static int serverPort = findAvailablePort();
private static MockWebServer server;
private static int serverPort;
@Autowired
private KafkaTemplate kafkaTemplate;
private TestTopicSender testTopicSender;
@Container
private static StreamAppContainer processor = prepackagedKafkaContainerFor("http-request-processor", VERSION)
.withLogConsumer(appLog("http-request-processor"))
.withEnv("HTTP_REQUEST_URL_EXPRESSION", "'http://" + localHostAddress() + ":" + serverPort + "'")
.withEnv("HTTP_REQUEST_HTTP_METHOD_EXPRESSION", "'POST'");
@Autowired
private OutputMatcher outputMatcher;
private static StreamAppContainer processor;
protected static StreamAppContainer configureProcessor(StreamAppContainer baseContainer) {
serverPort = StreamAppContainerTestUtils.findAvailablePort();
processor = baseContainer.withLogConsumer(appLog("http-request-processor"))
.withEnv("HTTP_REQUEST_URL_EXPRESSION",
"'http://" + StreamAppContainerTestUtils.localHostAddress() + ":" + serverPort + "'")
.withEnv("HTTP_REQUEST_HTTP_METHOD_EXPRESSION", "'POST'");
return processor;
}
@BeforeAll
static void startServer() throws Exception {
server = new MockWebServer();
server.start(InetAddress.getLocalHost(), serverPort);
}
@@ -70,11 +81,16 @@ public class KafkaHttpRequestProcessorTests extends KafkaStreamApplicationIntegr
.setResponseCode(HttpStatus.OK.value());
}
});
kafkaTemplate.send(processor.getInputDestination(), "ping");
testTopicSender.send(processor.getInputDestination(), "ping");
await().atMost(DEFAULT_DURATION)
.until(messageMatches(message -> message.getPayload().equals("{\"response\":\"ping\"}")
.until(outputMatcher.messageMatches(message -> message.getPayload().equals("{\"response\":\"ping\"}")
&& message.getHeaders().get(MessageHeaders.CONTENT_TYPE)
.equals(MediaType.APPLICATION_JSON_VALUE)));
}
@AfterAll
static void cleanUp() throws IOException {
server.shutdown();
}
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.processor.httprequest;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
class KafkaHttpRequestProcessorTests extends HttpRequestProcessorTests {
@Container
private static StreamAppContainer container = configureProcessor(KafkaConfig
.prepackagedContainerFor("http-request-processor", VERSION));
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.processor.httprequest;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
class RabbitMQHttpRequestProcessorTests extends HttpRequestProcessorTests {
@Container
private static StreamAppContainer container = configureProcessor(RabbitMQConfig
.prepackagedContainerFor("http-request-processor", VERSION));
}

View File

@@ -1,52 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.rabbitmq.source;
import java.time.Duration;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@Testcontainers
public class RabbitMQTimeSourceTests extends RabbitMQStreamApplicationIntegrationTestSupport {
// "MM/dd/yy HH:mm:ss";
private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
static LogMatcher logMatcher = LogMatcher.contains("Started TimeSource");
@Container
static StreamAppContainer timeSource = prepackagedRabbitMQContainerFor("time-source", VERSION)
.withLogConsumer(logMatcher);
@Test
void test() {
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
await().atMost(DEFAULT_DURATION).pollInterval(Duration.ofSeconds(1))
.until(payloadMatches((String s) -> pattern.matcher(s).matches()));
}
}

View File

@@ -1,46 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.rabbitmq.stream;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamApps.rabbitMQStreamApps;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class RabbitMQTikTokTests extends RabbitMQStreamApplicationIntegrationTestSupport {
private static LogMatcher logMatcher = LogMatcher.matchesRegex(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}")
.times(3);
@Container
private static final StreamApps streamApp = rabbitMQStreamApps(RabbitMQTikTokTests.class.getSimpleName(), rabbitmq)
.withSourceContainer(prepackagedRabbitMQContainerFor("time-source", VERSION))
.withSinkContainer(prepackagedRabbitMQContainerFor("log-sink", VERSION).withLogConsumer(logMatcher).log())
.build();
@Test
void test() {
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
}
}

View File

@@ -14,11 +14,10 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.sink;
package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.MySQLContainer;
@@ -26,51 +25,58 @@ import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaJdbcSinkTests extends KafkaStreamApplicationIntegrationTestSupport {
public abstract class JdbcSinkTests {
private static JdbcTemplate jdbcTemplate;
private static StreamAppContainer sink;
private static LogMatcher logMatcher = LogMatcher.contains("Started JdbcSink");
@Autowired
private KafkaTemplate kafkaTemplate;
private TestTopicSender testTopicSender;
@Container
private static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7"))
.withUsername("test")
.withPassword("secret")
.withExposedPorts(3306)
.withNetwork(kafka.getNetwork())
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("mysql-for-sink")
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withLogConsumer(appLog("mysql-for-sink"))
.withCommand("--init-file", "/init.sql");
static StreamAppContainer sink = prepackagedKafkaContainerFor("jdbc-sink", VERSION)
.dependsOn(mySQL)
.withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
.withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
// .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
// .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_JDBC", "DEBUG")
// .withEnv("LOGGING_LEVEL_ORG_MARIADB_JDBC", "DEBUG")
// .log()
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mariadb://mysql-for-sink:3306/test");
protected static void configureSink(StreamAppContainer baseContainer) {
sink = baseContainer
.dependsOn(mySQL)
.withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
.withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
// .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
// .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_JDBC", "DEBUG")
// .withEnv("LOGGING_LEVEL_ORG_MARIADB_JDBC", "DEBUG")
// .log()
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mariadb://mysql-for-sink:3306/test")
.withLogConsumer(logMatcher);
startSink();
}
@BeforeAll
static void startStreamApps() {
static void startSink() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName("org.mariadb.jdbc.Driver");
@@ -83,18 +89,27 @@ public class KafkaJdbcSinkTests extends KafkaStreamApplicationIntegrationTestSup
.until(() -> jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)
.intValue() == 0);
sink.start();
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
}
@Test
void test() throws JsonProcessingException {
void test() {
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\",\"street\":\"Narrow Alley\"}}";
kafkaTemplate.send(sink.getInputDestination(), json);
testTopicSender.send(sink.getInputDestination(), json);
await().atMost(DEFAULT_DURATION)
.until(
() -> jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)
.intValue() == 1);
.untilAsserted(
() -> assertThat(
jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class).intValue())
.isOne());
assertThat(jdbcTemplate.queryForObject("SELECT name from People",
String.class)).isEqualTo("My Name");
}
@AfterAll
static void cleanUp() {
sink.stop();
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
class KafkaJdbcSinkTests extends JdbcSinkTests {
@BeforeAll
static void init() {
configureSink(KafkaConfig
.prepackagedContainerFor("jdbc-sink", VERSION));
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
class RabbitMQJdbcSinkTests extends JdbcSinkTests {
@BeforeAll
static void init() {
configureSink(RabbitMQConfig
.prepackagedContainerFor("jdbc-sink", VERSION));
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
class KafkaMongoDBSinkTests extends MongoDBSinkTests {
@BeforeAll
static void init() {
configureSink(KafkaConfig
.prepackagedContainerFor("mongodb-sink", VERSION));
}
}

View File

@@ -14,56 +14,58 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.sink;
package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
import java.time.Duration;
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.kafka.core.KafkaTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaMongoDBSinkTests extends KafkaStreamApplicationIntegrationTestSupport {
abstract class MongoDBSinkTests {
private static MongoTemplate mongoTemplate;
@Autowired
private KafkaTemplate kafkaTemplate;
private TestTopicSender testTopicSender;
@Container
private static MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"))
.withExposedPorts(27017)
.withStartupTimeout(Duration.ofMinutes(2));
private static String mongoConnectionString() {
return String.format("mongodb://%s:%s/%s", localHostAddress(),
return String.format("mongodb://%s:%s/%s", StreamAppContainerTestUtils.localHostAddress(),
mongoDBContainer.getMappedPort(27017), "test");
}
@Container
private StreamAppContainer sink = prepackagedKafkaContainerFor("mongodb-sink", VERSION)
.withEnv("MONGO_DB_CONSUMER_COLLECTION", "test")
.withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString());
private static StreamAppContainer sink;
protected static void configureSink(StreamAppContainer baseContainer) {
mongoDBContainer.start();
sink = baseContainer
.withEnv("MONGODB_CONSUMER_COLLECTION", "test")
.withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString());
sink.start();
buildMongoTemplate();
}
@BeforeAll
static void buildMongoTemplate() {
mongoDBContainer.start();
MongoDatabaseFactory mongoDatabaseFactory = new SimpleMongoClientDatabaseFactory(
mongoConnectionString());
mongoTemplate = new MongoTemplate(mongoDatabaseFactory);
@@ -72,7 +74,7 @@ public class KafkaMongoDBSinkTests extends KafkaStreamApplicationIntegrationTest
@Test
void postData() {
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\":\"Narrow Alley\"}}";
kafkaTemplate.send(sink.getInputDestination(), json);
testTopicSender.send(sink.getInputDestination(), json);
await().atMost(DEFAULT_DURATION).untilAsserted(() -> {
List<Document> docs = mongoTemplate.findAll(Document.class, "test");
@@ -83,5 +85,7 @@ public class KafkaMongoDBSinkTests extends KafkaStreamApplicationIntegrationTest
@AfterAll
static void cleanUp() {
mongoDBContainer.close();
sink.stop();
}
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
class RabbitMQMongoDBSinkTests extends MongoDBSinkTests {
@BeforeAll
static void init() {
configureSink(RabbitMQConfig
.prepackagedContainerFor("mongodb-sink", VERSION));
}
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
public class KafkaTcpSinkTests extends TcpSinkTests {
@Container
static StreamAppContainer container = configureSink(KafkaConfig.prepackagedContainerFor("tcp-sink", VERSION));
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
public class RabbitMQTcpSinkTests extends TcpSinkTests {
@Container
static StreamAppContainer container = configureSink(RabbitMQConfig.prepackagedContainerFor("tcp-sink", VERSION));
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.sink;
package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
import java.io.BufferedReader;
import java.io.IOException;
@@ -25,45 +25,49 @@ import java.net.Socket;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaTcpSinkTests extends KafkaStreamApplicationIntegrationTestSupport {
abstract class TcpSinkTests {
private static final int tcpPort = findAvailablePort();
private static int tcpPort;
private static Socket socket;
private static final AtomicBoolean socketReady = new AtomicBoolean();
@Autowired
private KafkaTemplate kafkaTemplate;
private static StreamAppContainer sink;
@Container
private static StreamAppContainer sink = prepackagedKafkaContainerFor("tcp-sink", VERSION)
.withEnv("TCP_CONSUMER_HOST", localHostAddress())
.withEnv("TCP_PORT", String.valueOf(tcpPort))
.withEnv("TCP_CONSUMER_ENCODER", "CRLF");
@Autowired
private TestTopicSender testTopicSender;
protected static StreamAppContainer configureSink(StreamAppContainer baseContainer) {
tcpPort = StreamAppContainerTestUtils.findAvailablePort();
sink = baseContainer.withEnv("TCP_CONSUMER_HOST", StreamAppContainerTestUtils.localHostAddress())
.withEnv("TCP_PORT", String.valueOf(tcpPort))
.withEnv("TCP_CONSUMER_ENCODER", "CRLF");
return sink;
}
@BeforeAll
static void startTcpServer() {
socketReady.set(false);
new Thread(() -> {
try {
socket = new ServerSocket(tcpPort, 50, InetAddress.getLocalHost()).accept();
socketReady.set(true);
}
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
throw new RuntimeException("failed to bind to port " + tcpPort + ": " + e.getMessage(), e);
}
}).start();
}
@@ -72,10 +76,15 @@ public class KafkaTcpSinkTests extends KafkaStreamApplicationIntegrationTestSupp
void postData() throws IOException {
// Sink will not connect until it receives a message.
String text = "Hello, world!";
kafkaTemplate.send(sink.getInputDestination(), text);
testTopicSender.send(sink.getInputDestination(), text);
await().atMost(DEFAULT_DURATION).untilTrue(socketReady);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
await().atMost(Duration.ofSeconds(10)).until(() -> reader.readLine().equals(text));
}
@AfterAll
static void cleanUp() throws IOException {
socket.close();
}
}

View File

@@ -14,9 +14,10 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
package org.springframework.cloud.stream.apps.integration.test.source.geode;
import java.time.Duration;
import java.util.UUID;
import java.util.function.Consumer;
import com.github.dockerjava.api.command.CreateContainerCmd;
@@ -29,31 +30,41 @@ import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.fn.test.support.geode.GeodeContainer;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppContainer;
import org.springframework.context.ConfigurableApplicationContext;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaGeodeSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
public abstract class GeodeSourceTests {
private static int locatorPort = StreamAppContainerTestUtils.findAvailablePort();
private static final int locatorPort = findAvailablePort();
private static final int cacheServerPort = findAvailablePort();
private static int cacheServerPort = StreamAppContainerTestUtils.findAvailablePort();
private static Region<Object, Object> clientRegion;
private static ClientCache clientCache;
private static LogMatcher logMatcher = LogMatcher.contains("Started GeodeSource");
protected static LogMatcher logMatcher = LogMatcher.contains("Started GeodeSource");
protected static StreamAppContainer source;
@Autowired
private OutputMatcher outputMatcher;
@Autowired
private ConfigurableApplicationContext context;
@Container
private static final GeodeContainer geode = (GeodeContainer) new GeodeContainer(new ImageFromDockerfile()
@@ -63,7 +74,6 @@ public class KafkaGeodeSourceTests extends KafkaStreamApplicationIntegrationTest
locatorPort, cacheServerPort)
.withCreateContainerCmdModifier(
(Consumer<CreateContainerCmd>) createContainerCmd -> createContainerCmd
.withName("apache_geode")
.withHostName("geode").withHostConfig(new HostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(cacheServerPort),
new ExposedPort(cacheServerPort)),
@@ -72,14 +82,7 @@ public class KafkaGeodeSourceTests extends KafkaStreamApplicationIntegrationTest
.withCommand("tail", "-f", "/dev/null")
.withStartupTimeout(DEFAULT_DURATION.multipliedBy(2));
private static final StreamAppContainer geodeSource = prepackagedKafkaContainerFor("geode-source", VERSION)
.withEnv("GEODE_POOL_CONNECT_TYPE", "server")
.withEnv("GEODE_REGION_REGION_NAME", "myRegion")
.withEnv("GEODE_POOL_HOST_ADDRESSES", localHostAddress() + ":" + cacheServerPort)
.withLogConsumer(logMatcher).log();
@BeforeAll
static void init() {
protected static void initializeGeodeCacheThenStartSource() {
// Not using locator is faster.
System.out.println(geode.execGfsh(
"start server --name=Server1 " + "--hostname-for-clients=geode" + " --server-port="
@@ -93,19 +96,43 @@ public class KafkaGeodeSourceTests extends KafkaStreamApplicationIntegrationTest
clientRegion = clientCache
.createClientRegionFactory(ClientRegionShortcut.PROXY)
.create("myRegion");
geodeSource.start();
source.withEnv("GEODE_POOL_CONNECT_TYPE", "server")
.withEnv("GEODE_REGION_REGION_NAME", "myRegion")
.withEnv("GEODE_POOL_HOST_ADDRESSES",
StreamAppContainerTestUtils.localHostAddress() + ":" + cacheServerPort)
.withLogConsumer(logMatcher).log().start();
}
@Test
void test() {
void test() throws InterruptedException {
await().atMost(Duration.ofMinutes(2)).until(logMatcher.matches());
clientRegion.put("hello", "world");
await().atMost(DEFAULT_DURATION)
.until(payloadMatches((String s) -> s.contains("world")));
String random = UUID.randomUUID().toString();
clientRegion.put(random, random);
if (source instanceof RabbitMQStreamAppContainer) {
// Thread.sleep(30);
}
else {
return;
}
await().atMost(Duration.ofSeconds(30))
.until(outputMatcher.payloadMatches((String s) -> {
System.out.println("!!!!!!!!!!!!!Matching on " + s);
return s.contains(random);
}));
}
@AfterEach
void clear() {
}
@AfterAll
static void cleanup() {
source.stop();
clientCache.close();
geode.stop();
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.geode;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
class KafkaGeodeSourceTests extends GeodeSourceTests {
@BeforeAll
static void init() {
source = KafkaConfig
.prepackagedContainerFor("geode-source", VERSION);
initializeGeodeCacheThenStartSource();
}
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.geode;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
@Disabled("Some race condition when package tests run together")
class RabbitMQGeodeSourceTests extends GeodeSourceTests {
@BeforeAll
static void init() {
source = RabbitMQConfig
.prepackagedContainerFor("geode-source", VERSION);
initializeGeodeCacheThenStartSource();
}
}

View File

@@ -14,22 +14,22 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
package org.springframework.cloud.stream.apps.integration.test.source.http;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
@@ -37,26 +37,28 @@ import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaHttpSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
public abstract class HttpSourceTests {
@Autowired
TestTopicListener testTopicListener;
private static int serverPort = findAvailablePort();
private static int serverPort = StreamAppContainerTestUtils.findAvailablePort();
private static WebClient webClient = WebClient.builder().build();
@Container
private final StreamAppContainer source = prepackagedKafkaContainerFor("http-source", VERSION)
.withEnv("SERVER_PORT", String.valueOf(serverPort))
.withExposedPorts(serverPort)
.waitingFor(Wait.forListeningPort().withStartupTimeout(DEFAULT_DURATION));
private static StreamAppContainer source;
protected static StreamAppContainer configureSource(StreamAppContainer baseContainer) {
source = baseContainer.withEnv("SERVER_PORT", String.valueOf(serverPort))
.withExposedPorts(serverPort)
.waitingFor(Wait.forListeningPort().withStartupTimeout(DEFAULT_DURATION));
return source;
}
@Autowired
private OutputMatcher outputMatcher;
@AfterEach
void reset() {
testTopicListener.clearMessageMatchers();
outputMatcher.clearMessageMatchers();
}
@Test
@@ -76,7 +78,7 @@ public class KafkaHttpSourceTests extends KafkaStreamApplicationIntegrationTestS
countDownLatch.await(30, TimeUnit.SECONDS);
assertThat(httpStatus.get().is2xxSuccessful()).isTrue();
await().atMost(DEFAULT_DURATION)
.until(payloadMatches(s -> s.equals("Hello")));
.until(outputMatcher.payloadMatches(s -> s.equals("Hello")));
}
@Test
@@ -95,7 +97,12 @@ public class KafkaHttpSourceTests extends KafkaStreamApplicationIntegrationTestS
});
countDownLatch.await(30, TimeUnit.SECONDS);
await().atMost(DEFAULT_DURATION)
.until(payloadMatches(s -> s.equals("{\"Hello\":\"world\"}")));
.until(outputMatcher.payloadMatches(s -> s.equals("{\"Hello\":\"world\"}")));
}
@AfterAll
static void cleanUp() {
source.stop();
}
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.http;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
class KafkaHttpSourceTests extends HttpSourceTests {
@Container
static StreamAppContainer base = configureSource(KafkaConfig.prepackagedContainerFor("http-source", VERSION));
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.http;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
class RabbitMQHttpSourceTests extends HttpSourceTests {
@Container
static StreamAppContainer base = configureSource(RabbitMQConfig.prepackagedContainerFor("http-source", VERSION));
}

View File

@@ -14,46 +14,54 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
package org.springframework.cloud.stream.apps.integration.test.source.jdbc;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaJdbcSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
abstract class JdbcSourceTests {
@Autowired
private OutputMatcher outputMatcher;
private static StreamAppContainer source;
@Container
private static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7"))
.withUsername("test")
.withPassword("secret")
.withExposedPorts(3306)
.withNetwork(kafka.getNetwork())
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("mysql-for-source")
.withLogConsumer(appLog("mysql-for-source"))
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withCommand("--init-file", "/init.sql");
private static final StreamAppContainer source = prepackagedKafkaContainerFor("jdbc-source", VERSION)
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
.withEnv("SPRING_DATASOURCE_URL", "jdbc:mariadb://mysql-for-source:3306/test");
protected static void configureSource(StreamAppContainer baseContainer) {
source = baseContainer
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
.withEnv("SPRING_DATASOURCE_URL", "jdbc:mariadb://mysql-for-source:3306/test");
startSource();
}
@BeforeAll
static void startSource() {
private static void startSource() {
await().atMost(DEFAULT_DURATION).until(() -> mySQL.isRunning());
source.start();
}
@@ -61,6 +69,11 @@ public class KafkaJdbcSourceTests extends KafkaStreamApplicationIntegrationTestS
@Test
void test() {
await().atMost(DEFAULT_DURATION)
.until(payloadMatches((String s) -> s.contains("Bart Simpson")));
.until(outputMatcher.payloadMatches((String s) -> s.contains("Bart Simpson")));
}
@AfterAll
private static void cleanUp() {
source.stop();
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.jdbc;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
class KafkaJdbcSourceTests extends JdbcSourceTests {
@BeforeAll
static void init() {
configureSource(KafkaConfig
.prepackagedContainerFor("jdbc-source", VERSION));
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.jdbc;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
class RabbitMQJdbcSourceTests extends JdbcSourceTests {
@BeforeAll
static void init() {
configureSource(RabbitMQConfig
.prepackagedContainerFor("jdbc-source", VERSION));
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.s3;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
class KafkaS3SourceTests extends S3SourceTests {
@BeforeEach
void init() {
configureSource(KafkaConfig
.prepackagedContainerFor("s3-source", VERSION));
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.s3;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
class RabbitMQS3SourceTests extends S3SourceTests {
@BeforeEach
void init() {
configureSource(RabbitMQConfig
.prepackagedContainerFor("s3-source", VERSION));
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
package org.springframework.cloud.stream.apps.integration.test.source.s3;
import java.util.Map;
import java.util.function.Consumer;
@@ -38,22 +38,28 @@ import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.app.test.integration.FluentMap.fluentMap;
import static org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils.localHostAddress;
import static org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils.resourceAsFile;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaS3SourceTests extends KafkaStreamApplicationIntegrationTestSupport {
abstract class S3SourceTests {
private static AmazonS3 s3Client;
private StreamAppContainer source;
@Autowired
TestTopicListener testTopicListener;
private TestTopicListener testTopicListener;
@Autowired
private OutputMatcher outputMatcher;
@Container
private static final GenericContainer minio = new GenericContainer(
@@ -69,7 +75,7 @@ public class KafkaS3SourceTests extends KafkaStreamApplicationIntegrationTestSup
.withCommand("minio", "server", "/data");
@BeforeAll
static void init() {
static void initS3() {
AWSCredentials credentials = new BasicAWSCredentials("minio", "minio123");
ClientConfiguration clientConfiguration = new ClientConfiguration();
s3Client = AmazonS3ClientBuilder
@@ -84,17 +90,18 @@ public class KafkaS3SourceTests extends KafkaStreamApplicationIntegrationTestSup
}
private StreamAppContainer source = prepackagedKafkaContainerFor("s3-source", VERSION)
.withEnv("S3_SUPPLIER_REMOTE_DIR", "bucket")
.withEnv("S3_COMMON_ENDPOINT_URL", "http://" + localHostAddress() + ":" + minio.getMappedPort(9000))
.withEnv("S3_COMMON_PATH_STYLE_ACCESS", "true")
.withEnv("CLOUD_AWS_STACK_AUTO", "false")
.withEnv("CLOUD_AWS_CREDENTIALS_ACCESS_KEY", "minio")
.withEnv("CLOUD_AWS_CREDENTIALS_SECRET_KEY", "minio123")
.withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
.withEnv("CLOUD_AWS_REGION_STATIC", "us-east-1").log();
protected void configureSource(StreamAppContainer baseContainer) {
source = baseContainer
.withEnv("S3_SUPPLIER_REMOTE_DIR", "bucket")
.withEnv("S3_COMMON_ENDPOINT_URL", "http://" + localHostAddress() + ":" + minio.getMappedPort(9000))
.withEnv("S3_COMMON_PATH_STYLE_ACCESS", "true")
.withEnv("CLOUD_AWS_STACK_AUTO", "false")
.withEnv("CLOUD_AWS_CREDENTIALS_ACCESS_KEY", "minio")
.withEnv("CLOUD_AWS_CREDENTIALS_SECRET_KEY", "minio123")
.withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
.withEnv("CLOUD_AWS_REGION_STATIC", "us-east-1").log();
}
//
@Test
void testLines() {
startContainer(
@@ -103,7 +110,7 @@ public class KafkaS3SourceTests extends KafkaStreamApplicationIntegrationTestSup
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
await().atMost(DEFAULT_DURATION).until(payloadMatches((String s) -> s.contains("Bart Simpson")));
await().atMost(DEFAULT_DURATION).until(outputMatcher.payloadMatches((String s) -> s.contains("Bart Simpson")));
}
@@ -119,7 +126,7 @@ public class KafkaS3SourceTests extends KafkaStreamApplicationIntegrationTestSup
s3Client.createBucket("bucket");
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
await().atMost(DEFAULT_DURATION).until(payloadMatches(s -> s.equals(
await().atMost(DEFAULT_DURATION).until(outputMatcher.payloadMatches(s -> s.equals(
"{\"args\":[\"filename=/tmp/s3-supplier/test\"],\"deploymentProps\":{},\"name\":\"myTask\"}")));
}
@@ -135,7 +142,8 @@ public class KafkaS3SourceTests extends KafkaStreamApplicationIntegrationTestSup
s3Client.putObject(new PutObjectRequest("bucket", "test",
resourceAsFile("minio/data")));
await().atMost(DEFAULT_DURATION)
.until(payloadMatches((String s) -> s.contains("\"bucketName\":\"bucket\",\"key\":\"test\"")));
.until(outputMatcher
.payloadMatches((String s) -> s.contains("\"bucketName\":\"bucket\",\"key\":\"test\"")));
}
private void startContainer(Map<String, String> environment) {
@@ -152,4 +160,5 @@ public class KafkaS3SourceTests extends KafkaStreamApplicationIntegrationTestSup
source.stop();
testTopicListener.clearMessageMatchers();
}
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.sftp;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
class KafkaSftpSourceTests extends SftpSourceTests {
@BeforeEach
private void initKafka() {
configureSource(KafkaConfig
.prepackagedContainerFor("sftp-source", VERSION));
}
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.sftp;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
class RabbitMQSftpSourceTests extends SftpSourceTests {
@BeforeEach
private void initKafka() {
configureSource(RabbitMQConfig
.prepackagedContainerFor("sftp-source", VERSION));
}
}

View File

@@ -14,25 +14,29 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
package org.springframework.cloud.stream.apps.integration.test.source.sftp;
import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaSftpSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
abstract class SftpSourceTests {
private StreamAppContainer source;
@Container
private static final GenericContainer sftp = new GenericContainer(DockerImageName.parse("atmoz/sftp"))
@@ -41,13 +45,18 @@ public class KafkaSftpSourceTests extends KafkaStreamApplicationIntegrationTestS
.withClasspathResourceMapping("sftp", "/home/user/remote", BindMode.READ_ONLY)
.withStartupTimeout(DEFAULT_DURATION);
private StreamAppContainer source = prepackagedKafkaContainerFor("sftp-source", VERSION)
.withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
.withEnv("SFTP_SUPPLIER_REMOTE_DIR", "/remote")
.withEnv("SFTP_SUPPLIER_FACTORY_USERNAME", "user")
.withEnv("SFTP_SUPPLIER_FACTORY_PASSWORD", "pass")
.withEnv("SFTP_SUPPLIER_FACTORY_PORT", String.valueOf(sftp.getMappedPort(22)))
.withEnv("SFTP_SUPPLIER_FACTORY_HOST", localHostAddress());
protected void configureSource(StreamAppContainer baseContainer) {
await().atMost(DEFAULT_DURATION).until(() -> sftp.isRunning());
source = baseContainer.withEnv("SFTP_SUPPLIER_FACTORY_ALLOW_UNKNOWN_KEYS", "true")
.withEnv("SFTP_SUPPLIER_REMOTE_DIR", "/remote")
.withEnv("SFTP_SUPPLIER_FACTORY_USERNAME", "user")
.withEnv("SFTP_SUPPLIER_FACTORY_PASSWORD", "pass")
.withEnv("SFTP_SUPPLIER_FACTORY_PORT", String.valueOf(sftp.getMappedPort(22)))
.withEnv("SFTP_SUPPLIER_FACTORY_HOST", StreamAppContainerTestUtils.localHostAddress());
}
@Autowired
private OutputMatcher outputMatcher;
// TODO: This fixture supports additional tests with different modes, etc.
@Test
@@ -55,7 +64,7 @@ public class KafkaSftpSourceTests extends KafkaStreamApplicationIntegrationTestS
startContainer(Collections.singletonMap("FILE_CONSUMER_MODE", "ref"));
await().atMost(DEFAULT_DURATION)
.until(payloadMatches((String s) -> s.equals("\"/tmp/sftp-supplier/data.txt\"")));
.until(outputMatcher.payloadMatches((String s) -> s.equals("\"/tmp/sftp-supplier/data.txt\"")));
}
private void startContainer(Map<String, String> environment) {
@@ -63,4 +72,9 @@ public class KafkaSftpSourceTests extends KafkaStreamApplicationIntegrationTestS
source.start();
}
@AfterEach
private void cleanUp() {
source.stop();
}
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.time;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@KafkaStreamAppTest
class KafkaTimeSourceTests extends TimeSourceTests {
@Container
static StreamAppContainer source = KafkaConfig
.prepackagedContainerFor("time-source", VERSION)
.withLogConsumer(logMatcher);
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.source.time;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@RabbitMQStreamAppTest
class RabbitMQTimeSourceTests extends TimeSourceTests {
@Container
static StreamAppContainer source = RabbitMQConfig
.prepackagedContainerFor("time-source", VERSION)
.withLogConsumer(logMatcher)
.withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
.withEnv("LOGGING_LEVEL_ORG_APACHE_GEODE", "DEBUG");
}

View File

@@ -14,36 +14,33 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.source;
package org.springframework.cloud.stream.apps.integration.test.source.time;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
@Testcontainers
public class KafkaTimeSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
abstract class TimeSourceTests {
// "MM/dd/yy HH:mm:ss";
private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
static LogMatcher logMatcher = LogMatcher.contains("Started TimeSource");
@Container
static StreamAppContainer timeSource = prepackagedKafkaContainerFor("time-source", VERSION)
.withLogConsumer(logMatcher);
@Autowired
private OutputMatcher outputMatcher;
@Test
void test() {
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
await().atMost(DEFAULT_DURATION).until(payloadMatches((String s) -> pattern.matcher(s).matches()));
await().atMost(DEFAULT_DURATION)
.until(outputMatcher.payloadMatches((String s) -> pattern.matcher(s).matches()));
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.stream;
package org.springframework.cloud.stream.apps.integration.test.stream.jdbclog;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
@@ -24,7 +24,8 @@ import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
@@ -32,7 +33,8 @@ import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaS
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaJdbcLogStreamTests extends KafkaStreamApplicationIntegrationTestSupport {
@KafkaStreamAppTest
public class KafkaJdbcLogStreamTests {
private static LogMatcher logMatcher = LogMatcher.contains("Bart Simpson");
@@ -41,24 +43,27 @@ public class KafkaJdbcLogStreamTests extends KafkaStreamApplicationIntegrationTe
.withUsername("test")
.withPassword("secret")
.withExposedPorts(3306)
.withNetwork(kafka.getNetwork())
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("mysql-for-stream")
.withLogConsumer(appLog("mySQL"))
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withCommand("--init-file", "/init.sql");
@Container
private static final StreamApps streamApp = kafkaStreamApps(KafkaJdbcLogStreamTests.class.getSimpleName(), kafka)
.withSourceContainer(prepackagedKafkaContainerFor("jdbc-source", VERSION)
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mariadb://mysql-for-stream:3306/test"))
.withSinkContainer(prepackagedKafkaContainerFor("log-sink", VERSION).withLogConsumer(logMatcher))
.build();
private static final StreamApps streamApp = kafkaStreamApps(KafkaJdbcLogStreamTests.class.getSimpleName(),
KafkaConfig.kafka)
.withSourceContainer(
KafkaConfig.prepackagedContainerFor("jdbc-source", VERSION)
.withEnv("JDBC_SUPPLIER_QUERY", "SELECT * FROM People WHERE deleted='N'")
.withEnv("JDBC_SUPPLIER_UPDATE", "UPDATE People SET deleted='Y' WHERE id=:id")
.withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
.withEnv("SPRING_DATASOURCE_USERNAME", "test")
.withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
.withEnv("SPRING_DATASOURCE_URL",
"jdbc:mariadb://mysql-for-stream:3306/test"))
.withSinkContainer(
KafkaConfig.prepackagedContainerFor("log-sink", VERSION).withLogConsumer(logMatcher))
.build();
@Test
void test() {

View File

@@ -14,30 +14,36 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.apps.integration.test.kafka.stream;
package org.springframework.cloud.stream.apps.integration.test.stream.tiktok;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApps.kafkaStreamApps;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
public class KafkaTikTokTests extends KafkaStreamApplicationIntegrationTestSupport {
@KafkaStreamAppTest
public class KafkaTikTokTests {
private static LogMatcher logMatcher = LogMatcher.matchesRegex(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}")
.times(3);
@Container
private static final StreamApps streamApp = kafkaStreamApps(KafkaTikTokTests.class.getSimpleName(), kafka)
.withSourceContainer(prepackagedKafkaContainerFor("time-source", VERSION))
.withSinkContainer(prepackagedKafkaContainerFor("log-sink", VERSION).withLogConsumer(logMatcher).log())
.build();
private static final StreamApps streamApp = kafkaStreamApps(KafkaTikTokTests.class.getSimpleName(),
KafkaConfig.kafka)
.withSourceContainer(
KafkaConfig.prepackagedContainerFor("time-source", VERSION))
.withSinkContainer(
KafkaConfig.prepackagedContainerFor("log-sink", VERSION).withLogConsumer(logMatcher)
.log())
.build();
@Test
void test() {

View File

@@ -11,6 +11,7 @@
<logger name="org.testcontainers" level="INFO"/>
<logger name="com.github.dockerjava" level="WARN"/>
<!-- <logger name="org.springframework.kafka" level="INFO"/>-->
<!-- <logger name="org.springframework.cloud.stream.app.test.integration" level="DEBUG"/>-->
<!-- <logger name="org.springframework.amqp" level="DEBUG"/>-->
<!-- <logger name="org.springframework.retry" level="TRACE"/>-->
<logger name="org.springframework.cloud.stream.app.test.integration" level="DEBUG"/>
</configuration>