diff --git a/stream-applications-integration-tests/pom.xml b/stream-applications-integration-tests/pom.xml
index adcbc2b..b0921df 100644
--- a/stream-applications-integration-tests/pom.xml
+++ b/stream-applications-integration-tests/pom.xml
@@ -119,6 +119,12 @@
org.springframework.data
spring-data-geode
+
+
+ org.apache.logging.log4j
+ log4j
+
+
test
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/common/Configuration.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/common/Configuration.java
index f8a0c35..df75380 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/common/Configuration.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/common/Configuration.java
@@ -27,7 +27,7 @@ public abstract class Configuration {
private static final String SPRING_CLOUD_STREAM_APPLICATIONS_VERSION = "spring.cloud.stream.applications.version";
static {
- VERSION = System.getProperty(SPRING_CLOUD_STREAM_APPLICATIONS_VERSION, "latest");
+ VERSION = System.getProperty(SPRING_CLOUD_STREAM_APPLICATIONS_VERSION, "3.0.0-SNAPSHOT");
}
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/processor/KafkaHttpRequestProcessorTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/HttpRequestProcessorTests.java
similarity index 60%
rename from stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/processor/KafkaHttpRequestProcessorTests.java
rename to stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/HttpRequestProcessorTests.java
index 1802f0f..cca1a4a 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/processor/KafkaHttpRequestProcessorTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/HttpRequestProcessorTests.java
@@ -14,48 +14,59 @@
* limitations under the License.
*/
-package org.springframework.cloud.stream.apps.integration.test.kafka.processor;
+package org.springframework.cloud.stream.apps.integration.test.processor.httprequest;
+import java.io.IOException;
import java.net.InetAddress;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.testcontainers.junit.jupiter.Container;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
+import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
-import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageHeaders;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
-import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
-public class KafkaHttpRequestProcessorTests extends KafkaStreamApplicationIntegrationTestSupport {
- private static MockWebServer server = new MockWebServer();
+abstract class HttpRequestProcessorTests {
- private static int serverPort = findAvailablePort();
+ private static MockWebServer server;
+
+ private static int serverPort;
@Autowired
- private KafkaTemplate kafkaTemplate;
+ private TestTopicSender testTopicSender;
- @Container
- private static StreamAppContainer processor = prepackagedKafkaContainerFor("http-request-processor", VERSION)
- .withLogConsumer(appLog("http-request-processor"))
- .withEnv("HTTP_REQUEST_URL_EXPRESSION", "'http://" + localHostAddress() + ":" + serverPort + "'")
- .withEnv("HTTP_REQUEST_HTTP_METHOD_EXPRESSION", "'POST'");
+ @Autowired
+ private OutputMatcher outputMatcher;
+
+ private static StreamAppContainer processor;
+
+ protected static StreamAppContainer configureProcessor(StreamAppContainer baseContainer) {
+ serverPort = StreamAppContainerTestUtils.findAvailablePort();
+ processor = baseContainer.withLogConsumer(appLog("http-request-processor"))
+ .withEnv("HTTP_REQUEST_URL_EXPRESSION",
+ "'http://" + StreamAppContainerTestUtils.localHostAddress() + ":" + serverPort + "'")
+ .withEnv("HTTP_REQUEST_HTTP_METHOD_EXPRESSION", "'POST'");
+ return processor;
+ }
@BeforeAll
static void startServer() throws Exception {
+ server = new MockWebServer();
server.start(InetAddress.getLocalHost(), serverPort);
}
@@ -70,11 +81,16 @@ public class KafkaHttpRequestProcessorTests extends KafkaStreamApplicationIntegr
.setResponseCode(HttpStatus.OK.value());
}
});
- kafkaTemplate.send(processor.getInputDestination(), "ping");
+ testTopicSender.send(processor.getInputDestination(), "ping");
await().atMost(DEFAULT_DURATION)
- .until(messageMatches(message -> message.getPayload().equals("{\"response\":\"ping\"}")
+ .until(outputMatcher.messageMatches(message -> message.getPayload().equals("{\"response\":\"ping\"}")
&& message.getHeaders().get(MessageHeaders.CONTENT_TYPE)
.equals(MediaType.APPLICATION_JSON_VALUE)));
}
+ @AfterAll
+ static void cleanUp() throws IOException {
+ server.shutdown();
+ }
+
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/KafkaHttpRequestProcessorTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/KafkaHttpRequestProcessorTests.java
new file mode 100644
index 0000000..63b8068
--- /dev/null
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/KafkaHttpRequestProcessorTests.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.apps.integration.test.processor.httprequest;
+
+import org.testcontainers.junit.jupiter.Container;
+
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
+import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
+
+import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
+
+@KafkaStreamAppTest
+class KafkaHttpRequestProcessorTests extends HttpRequestProcessorTests {
+ @Container
+ private static StreamAppContainer container = configureProcessor(KafkaConfig
+ .prepackagedContainerFor("http-request-processor", VERSION));
+
+}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/RabbitMQHttpRequestProcessorTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/RabbitMQHttpRequestProcessorTests.java
new file mode 100644
index 0000000..fe394c2
--- /dev/null
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/httprequest/RabbitMQHttpRequestProcessorTests.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.apps.integration.test.processor.httprequest;
+
+import org.testcontainers.junit.jupiter.Container;
+
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
+import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
+
+import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
+
+@RabbitMQStreamAppTest
+class RabbitMQHttpRequestProcessorTests extends HttpRequestProcessorTests {
+ @Container
+ private static StreamAppContainer container = configureProcessor(RabbitMQConfig
+ .prepackagedContainerFor("http-request-processor", VERSION));
+
+}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/rabbitmq/source/RabbitMQTimeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/rabbitmq/source/RabbitMQTimeSourceTests.java
deleted file mode 100644
index dfc88a3..0000000
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/rabbitmq/source/RabbitMQTimeSourceTests.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2020-2020 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.cloud.stream.apps.integration.test.rabbitmq.source;
-
-import java.time.Duration;
-import java.util.regex.Pattern;
-
-import org.junit.jupiter.api.Test;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-
-import org.springframework.cloud.stream.app.test.integration.LogMatcher;
-import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamApplicationIntegrationTestSupport;
-
-import static org.awaitility.Awaitility.await;
-import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
-import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
-
-@Testcontainers
-public class RabbitMQTimeSourceTests extends RabbitMQStreamApplicationIntegrationTestSupport {
-
- // "MM/dd/yy HH:mm:ss";
- private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
-
- static LogMatcher logMatcher = LogMatcher.contains("Started TimeSource");
-
- @Container
- static StreamAppContainer timeSource = prepackagedRabbitMQContainerFor("time-source", VERSION)
- .withLogConsumer(logMatcher);
-
- @Test
- void test() {
- await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
- await().atMost(DEFAULT_DURATION).pollInterval(Duration.ofSeconds(1))
- .until(payloadMatches((String s) -> pattern.matcher(s).matches()));
- }
-}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/rabbitmq/stream/RabbitMQTikTokTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/rabbitmq/stream/RabbitMQTikTokTests.java
deleted file mode 100644
index ea751c3..0000000
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/rabbitmq/stream/RabbitMQTikTokTests.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2020-2020 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.cloud.stream.apps.integration.test.rabbitmq.stream;
-
-import org.junit.jupiter.api.Test;
-import org.testcontainers.junit.jupiter.Container;
-
-import org.springframework.cloud.stream.app.test.integration.LogMatcher;
-import org.springframework.cloud.stream.app.test.integration.StreamApps;
-import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamApplicationIntegrationTestSupport;
-
-import static org.awaitility.Awaitility.await;
-import static org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamApps.rabbitMQStreamApps;
-import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
-import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
-
-public class RabbitMQTikTokTests extends RabbitMQStreamApplicationIntegrationTestSupport {
-
- private static LogMatcher logMatcher = LogMatcher.matchesRegex(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}")
- .times(3);
-
- @Container
- private static final StreamApps streamApp = rabbitMQStreamApps(RabbitMQTikTokTests.class.getSimpleName(), rabbitmq)
- .withSourceContainer(prepackagedRabbitMQContainerFor("time-source", VERSION))
- .withSinkContainer(prepackagedRabbitMQContainerFor("log-sink", VERSION).withLogConsumer(logMatcher).log())
- .build();
-
- @Test
- void test() {
- await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
- }
-}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/sink/KafkaJdbcSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/JdbcSinkTests.java
similarity index 61%
rename from stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/sink/KafkaJdbcSinkTests.java
rename to stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/JdbcSinkTests.java
index 5444e75..2d96162 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/sink/KafkaJdbcSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/JdbcSinkTests.java
@@ -14,11 +14,10 @@
* limitations under the License.
*/
-package org.springframework.cloud.stream.apps.integration.test.kafka.sink;
+package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.zaxxer.hikari.HikariDataSource;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.MySQLContainer;
@@ -26,51 +25,58 @@ import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.app.test.integration.LogMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
+import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
+import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.kafka.core.KafkaTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
-import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
-public class KafkaJdbcSinkTests extends KafkaStreamApplicationIntegrationTestSupport {
+public abstract class JdbcSinkTests {
private static JdbcTemplate jdbcTemplate;
+ private static StreamAppContainer sink;
+
+ private static LogMatcher logMatcher = LogMatcher.contains("Started JdbcSink");
+
@Autowired
- private KafkaTemplate kafkaTemplate;
+ private TestTopicSender testTopicSender;
@Container
private static MySQLContainer mySQL = new MySQLContainer<>(DockerImageName.parse("mysql:5.7"))
.withUsername("test")
.withPassword("secret")
.withExposedPorts(3306)
- .withNetwork(kafka.getNetwork())
+ .withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("mysql-for-sink")
.withClasspathResourceMapping("init.sql", "/init.sql", BindMode.READ_ONLY)
.withLogConsumer(appLog("mysql-for-sink"))
.withCommand("--init-file", "/init.sql");
- static StreamAppContainer sink = prepackagedKafkaContainerFor("jdbc-sink", VERSION)
- .dependsOn(mySQL)
- .withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
- .withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
- .withEnv("SPRING_DATASOURCE_USERNAME", "test")
- .withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
- .withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
- // .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
- // .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_JDBC", "DEBUG")
- // .withEnv("LOGGING_LEVEL_ORG_MARIADB_JDBC", "DEBUG")
- // .log()
- .withEnv("SPRING_DATASOURCE_URL",
- "jdbc:mariadb://mysql-for-sink:3306/test");
+ protected static void configureSink(StreamAppContainer baseContainer) {
+ sink = baseContainer
+ .dependsOn(mySQL)
+ .withEnv("JDBC_CONSUMER_COLUMNS", "name,city:address.city,street:address.street")
+ .withEnv("JDBC_CONSUMER_TABLE_NAME", "People")
+ .withEnv("SPRING_DATASOURCE_USERNAME", "test")
+ .withEnv("SPRING_DATASOURCE_PASSWORD", "secret")
+ .withEnv("SPRING_DATASOURCE_DRIVER_CLASS_NAME", "org.mariadb.jdbc.Driver")
+ // .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_INTEGRATION", "DEBUG")
+ // .withEnv("LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_JDBC", "DEBUG")
+ // .withEnv("LOGGING_LEVEL_ORG_MARIADB_JDBC", "DEBUG")
+ // .log()
+ .withEnv("SPRING_DATASOURCE_URL",
+ "jdbc:mariadb://mysql-for-sink:3306/test")
+ .withLogConsumer(logMatcher);
+ startSink();
+ }
- @BeforeAll
- static void startStreamApps() {
+ static void startSink() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName("org.mariadb.jdbc.Driver");
@@ -83,18 +89,27 @@ public class KafkaJdbcSinkTests extends KafkaStreamApplicationIntegrationTestSup
.until(() -> jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)
.intValue() == 0);
sink.start();
+ await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
}
@Test
- void test() throws JsonProcessingException {
+ void test() {
+
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\",\"street\":\"Narrow Alley\"}}";
- kafkaTemplate.send(sink.getInputDestination(), json);
+ testTopicSender.send(sink.getInputDestination(), json);
await().atMost(DEFAULT_DURATION)
- .until(
- () -> jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)
- .intValue() == 1);
+ .untilAsserted(
+ () -> assertThat(
+ jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class).intValue())
+ .isOne());
assertThat(jdbcTemplate.queryForObject("SELECT name from People",
String.class)).isEqualTo("My Name");
}
+
+ @AfterAll
+ static void cleanUp() {
+ sink.stop();
+ }
+
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/KafkaJdbcSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/KafkaJdbcSinkTests.java
new file mode 100644
index 0000000..1f89f41
--- /dev/null
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/KafkaJdbcSinkTests.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
+import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
+
+import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
+
+@KafkaStreamAppTest
+class KafkaJdbcSinkTests extends JdbcSinkTests {
+
+ @BeforeAll
+ static void init() {
+ configureSink(KafkaConfig
+ .prepackagedContainerFor("jdbc-sink", VERSION));
+ }
+}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/RabbitMQJdbcSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/RabbitMQJdbcSinkTests.java
new file mode 100644
index 0000000..d370f9f
--- /dev/null
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/jdbc/RabbitMQJdbcSinkTests.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.apps.integration.test.sink.jdbc;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
+import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
+
+import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
+
+@RabbitMQStreamAppTest
+class RabbitMQJdbcSinkTests extends JdbcSinkTests {
+
+ @BeforeAll
+ static void init() {
+ configureSink(RabbitMQConfig
+ .prepackagedContainerFor("jdbc-sink", VERSION));
+ }
+}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/KafkaMongoDBSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/KafkaMongoDBSinkTests.java
new file mode 100644
index 0000000..17c3c2b
--- /dev/null
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/KafkaMongoDBSinkTests.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
+import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
+
+import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
+
+@KafkaStreamAppTest
+class KafkaMongoDBSinkTests extends MongoDBSinkTests {
+ @BeforeAll
+ static void init() {
+ configureSink(KafkaConfig
+ .prepackagedContainerFor("mongodb-sink", VERSION));
+ }
+
+}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/sink/KafkaMongoDBSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/MongoDBSinkTests.java
similarity index 72%
rename from stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/sink/KafkaMongoDBSinkTests.java
rename to stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/MongoDBSinkTests.java
index 91f39ea..8ab9776 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/sink/KafkaMongoDBSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/MongoDBSinkTests.java
@@ -14,56 +14,58 @@
* limitations under the License.
*/
-package org.springframework.cloud.stream.apps.integration.test.kafka.sink;
+package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
import java.time.Duration;
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MongoDBContainer;
-import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
+import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
-import org.springframework.kafka.core.KafkaTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
-import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
-public class KafkaMongoDBSinkTests extends KafkaStreamApplicationIntegrationTestSupport {
+abstract class MongoDBSinkTests {
private static MongoTemplate mongoTemplate;
@Autowired
- private KafkaTemplate kafkaTemplate;
+ private TestTopicSender testTopicSender;
- @Container
private static MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"))
.withExposedPorts(27017)
.withStartupTimeout(Duration.ofMinutes(2));
private static String mongoConnectionString() {
- return String.format("mongodb://%s:%s/%s", localHostAddress(),
+ return String.format("mongodb://%s:%s/%s", StreamAppContainerTestUtils.localHostAddress(),
mongoDBContainer.getMappedPort(27017), "test");
}
- @Container
- private StreamAppContainer sink = prepackagedKafkaContainerFor("mongodb-sink", VERSION)
- .withEnv("MONGO_DB_CONSUMER_COLLECTION", "test")
- .withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString());
+ private static StreamAppContainer sink;
+
+ protected static void configureSink(StreamAppContainer baseContainer) {
+ mongoDBContainer.start();
+ sink = baseContainer
+ .withEnv("MONGODB_CONSUMER_COLLECTION", "test")
+ .withEnv("SPRING_DATA_MONGODB_URL", mongoConnectionString());
+ sink.start();
+ buildMongoTemplate();
+ }
- @BeforeAll
static void buildMongoTemplate() {
+ mongoDBContainer.start();
MongoDatabaseFactory mongoDatabaseFactory = new SimpleMongoClientDatabaseFactory(
mongoConnectionString());
mongoTemplate = new MongoTemplate(mongoDatabaseFactory);
@@ -72,7 +74,7 @@ public class KafkaMongoDBSinkTests extends KafkaStreamApplicationIntegrationTest
@Test
void postData() {
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\":\"Narrow Alley\"}}";
- kafkaTemplate.send(sink.getInputDestination(), json);
+ testTopicSender.send(sink.getInputDestination(), json);
await().atMost(DEFAULT_DURATION).untilAsserted(() -> {
List docs = mongoTemplate.findAll(Document.class, "test");
@@ -83,5 +85,7 @@ public class KafkaMongoDBSinkTests extends KafkaStreamApplicationIntegrationTest
@AfterAll
static void cleanUp() {
mongoDBContainer.close();
+ sink.stop();
}
+
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/RabbitMQMongoDBSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/RabbitMQMongoDBSinkTests.java
new file mode 100644
index 0000000..1a13b49
--- /dev/null
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/mongodb/RabbitMQMongoDBSinkTests.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.apps.integration.test.sink.mongodb;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
+import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
+
+import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
+
+@RabbitMQStreamAppTest
+class RabbitMQMongoDBSinkTests extends MongoDBSinkTests {
+
+ @BeforeAll
+ static void init() {
+ configureSink(RabbitMQConfig
+ .prepackagedContainerFor("mongodb-sink", VERSION));
+ }
+
+}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/KafkaTcpSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/KafkaTcpSinkTests.java
new file mode 100644
index 0000000..5e9b7dc
--- /dev/null
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/KafkaTcpSinkTests.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
+
+import org.testcontainers.junit.jupiter.Container;
+
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig;
+import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppTest;
+
+import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
+
+@KafkaStreamAppTest
+public class KafkaTcpSinkTests extends TcpSinkTests {
+
+ @Container
+ static StreamAppContainer container = configureSink(KafkaConfig.prepackagedContainerFor("tcp-sink", VERSION));
+
+}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/RabbitMQTcpSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/RabbitMQTcpSinkTests.java
new file mode 100644
index 0000000..993e49d
--- /dev/null
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/RabbitMQTcpSinkTests.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
+
+import org.testcontainers.junit.jupiter.Container;
+
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
+import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQConfig;
+import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppTest;
+
+import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
+
+@RabbitMQStreamAppTest
+public class RabbitMQTcpSinkTests extends TcpSinkTests {
+
+ @Container
+ static StreamAppContainer container = configureSink(RabbitMQConfig.prepackagedContainerFor("tcp-sink", VERSION));
+}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/sink/KafkaTcpSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/TcpSinkTests.java
similarity index 65%
rename from stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/sink/KafkaTcpSinkTests.java
rename to stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/TcpSinkTests.java
index fbfa66a..c927a9f 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/sink/KafkaTcpSinkTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/tcp/TcpSinkTests.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.springframework.cloud.stream.apps.integration.test.kafka.sink;
+package org.springframework.cloud.stream.apps.integration.test.sink.tcp;
import java.io.BufferedReader;
import java.io.IOException;
@@ -25,45 +25,49 @@ import java.net.Socket;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.testcontainers.junit.jupiter.Container;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
-import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
+import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
-import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
-public class KafkaTcpSinkTests extends KafkaStreamApplicationIntegrationTestSupport {
+abstract class TcpSinkTests {
- private static final int tcpPort = findAvailablePort();
+ private static int tcpPort;
private static Socket socket;
private static final AtomicBoolean socketReady = new AtomicBoolean();
- @Autowired
- private KafkaTemplate kafkaTemplate;
+ private static StreamAppContainer sink;
- @Container
- private static StreamAppContainer sink = prepackagedKafkaContainerFor("tcp-sink", VERSION)
- .withEnv("TCP_CONSUMER_HOST", localHostAddress())
- .withEnv("TCP_PORT", String.valueOf(tcpPort))
- .withEnv("TCP_CONSUMER_ENCODER", "CRLF");
+ @Autowired
+ private TestTopicSender testTopicSender;
+
+ protected static StreamAppContainer configureSink(StreamAppContainer baseContainer) {
+ tcpPort = StreamAppContainerTestUtils.findAvailablePort();
+ sink = baseContainer.withEnv("TCP_CONSUMER_HOST", StreamAppContainerTestUtils.localHostAddress())
+ .withEnv("TCP_PORT", String.valueOf(tcpPort))
+ .withEnv("TCP_CONSUMER_ENCODER", "CRLF");
+ return sink;
+ }
@BeforeAll
static void startTcpServer() {
+ socketReady.set(false);
new Thread(() -> {
try {
socket = new ServerSocket(tcpPort, 50, InetAddress.getLocalHost()).accept();
socketReady.set(true);
}
catch (IOException e) {
- throw new RuntimeException(e.getMessage(), e);
+ throw new RuntimeException("failed to bind to port " + tcpPort + ": " + e.getMessage(), e);
}
}).start();
}
@@ -72,10 +76,15 @@ public class KafkaTcpSinkTests extends KafkaStreamApplicationIntegrationTestSupp
void postData() throws IOException {
// Sink will not connect until it receives a message.
String text = "Hello, world!";
- kafkaTemplate.send(sink.getInputDestination(), text);
+ testTopicSender.send(sink.getInputDestination(), text);
await().atMost(DEFAULT_DURATION).untilTrue(socketReady);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
await().atMost(Duration.ofSeconds(10)).until(() -> reader.readLine().equals(text));
}
+
+ @AfterAll
+ static void cleanUp() throws IOException {
+ socket.close();
+ }
}
diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/source/KafkaGeodeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/GeodeSourceTests.java
similarity index 65%
rename from stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/source/KafkaGeodeSourceTests.java
rename to stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/GeodeSourceTests.java
index a316a72..c75827b 100644
--- a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/kafka/source/KafkaGeodeSourceTests.java
+++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/geode/GeodeSourceTests.java
@@ -14,9 +14,10 @@
* limitations under the License.
*/
-package org.springframework.cloud.stream.apps.integration.test.kafka.source;
+package org.springframework.cloud.stream.apps.integration.test.source.geode;
import java.time.Duration;
+import java.util.UUID;
import java.util.function.Consumer;
import com.github.dockerjava.api.command.CreateContainerCmd;
@@ -29,31 +30,41 @@ import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.junit.jupiter.Container;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.fn.test.support.geode.GeodeContainer;
import org.springframework.cloud.stream.app.test.integration.LogMatcher;
+import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
-import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamApplicationIntegrationTestSupport;
+import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
+import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppContainer;
+import org.springframework.context.ConfigurableApplicationContext;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.DEFAULT_DURATION;
-import static org.springframework.cloud.stream.apps.integration.test.common.Configuration.VERSION;
-public class KafkaGeodeSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
+public abstract class GeodeSourceTests {
+ private static int locatorPort = StreamAppContainerTestUtils.findAvailablePort();
- private static final int locatorPort = findAvailablePort();
-
- private static final int cacheServerPort = findAvailablePort();
+ private static int cacheServerPort = StreamAppContainerTestUtils.findAvailablePort();
private static Region