Add support for application integration testing with TestContainers

This commit is contained in:
David Turanski
2020-10-17 18:40:06 -04:00
parent a1362d5455
commit 9ce810d029
8 changed files with 532 additions and 0 deletions

View File

@@ -25,6 +25,33 @@
<artifactId>spring-boot-starter-data-redis</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${test-containers.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${test-containers.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${test-containers.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

View File

@@ -0,0 +1,38 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
public final class AppLog extends Slf4jLogConsumer {
private static final Map<String, AppLog> appLogs = new ConcurrentHashMap<>();
public static AppLog appLog(String appName) {
if (!appLogs.containsKey(appName)) {
appLogs.put(appName, new AppLog(appName));
}
return appLogs.get(appName);
}
private AppLog(String appName) {
super(LoggerFactory.getLogger(appName));
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration;
import java.util.LinkedHashMap;
/**
* Fluent API wrapper for {@link java.util.LinkedHashMap}.
* @param <K> key type.
* @param <V> value type.
* @author David Turanski
*/
public class FluentMap<K, V> extends LinkedHashMap<K, V> {
public static FluentMap fluentMap() {
return new FluentMap<>();
}
public FluentMap<K, V> withEntry(K key, V value) {
put(key, value);
return this;
}
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.OutputFrame;
/**
* Utility for matching test container log contents.
* @author David Turanski
*/
public class LogMatcher implements Consumer<OutputFrame> {
private static Logger logger = LoggerFactory.getLogger(LogMatcher.class);
private List<Consumer<String>> listeners = new LinkedList<>();
public Callable<Boolean> verifies(Consumer<LogListener> consumer) {
LogListener logListener = new LogListener();
consumer.accept(logListener);
logListener.runnable.ifPresent(runnable -> runnable.run());
listeners.add(logListener);
return () -> logListener.matches().get();
}
@Override
public void accept(OutputFrame outputFrame) {
listeners.forEach(m -> m.accept(outputFrame.getUtf8String()));
}
public class LogListener implements Consumer<String> {
private AtomicBoolean matched = new AtomicBoolean();
private Optional<Runnable> runnable = Optional.empty();
private Pattern pattern;
@Override
public void accept(String s) {
logger.trace(this + "matching " + s.trim() + " using pattern " + pattern.pattern());
if (pattern.matcher(s.trim()).matches()) {
logger.debug(" MATCHED " + s.trim());
matched.set(true);
listeners.remove(this);
}
}
public LogListener contains(String string) {
return matchesRegex(".*" + string + ".*");
}
public LogListener endsWith(String string) {
return matchesRegex(".*" + string);
}
public LogListener matchesRegex(String regex) {
this.pattern = Pattern.compile(regex);
return this;
}
public LogListener when(Runnable runnable) {
this.runnable = Optional.of(runnable);
return this;
}
public AtomicBoolean matches() {
return matched;
}
}
}

View File

@@ -0,0 +1,184 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.lifecycle.Startable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import static org.springframework.cloud.stream.app.test.integration.AppLog.appLog;
public abstract class StreamApps implements AutoCloseable, Startable {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
private final GenericContainer sourceContainer;
private final GenericContainer sinkContainer;
private List<GenericContainer> processorContainers;
protected StreamApps(GenericContainer sourceContainer, List<GenericContainer> processorContainers,
GenericContainer sinkContainer) {
this.sourceContainer = sourceContainer;
this.sinkContainer = sinkContainer;
this.processorContainers = processorContainers;
}
public GenericContainer sourceContainer() {
return sourceContainer;
}
public GenericContainer sinkContainer() {
return sinkContainer;
}
public List<GenericContainer> processorContainers() {
return processorContainers;
}
public void start() {
if (logger.isDebugEnabled()) {
logger.debug("Starting apps...");
logger.debug("Source container environment:");
sourceContainer().getEnv().forEach((Consumer<String>) env -> logger.debug(env));
sourceContainer().withLogConsumer(appLog(sourceContainer().getImage().get()));
if (!CollectionUtils.isEmpty(processorContainers)) {
logger.debug("\nProcessor containers environment:");
processorContainers().forEach(container -> {
logger.debug("Processor container environment:");
container.getEnv().forEach((Consumer<String>) env -> logger.debug(env));
container.withLogConsumer(appLog(container.getImage().get()));
});
}
logger.debug("\nSink container environment:");
sinkContainer().getEnv().forEach((Consumer<String>) env -> logger.debug(env));
sinkContainer().withLogConsumer(appLog(sinkContainer().getImage().get()));
}
sinkContainer.start();
processorContainers.forEach(GenericContainer::start);
sourceContainer.start();
}
public void stop() {
sinkContainer.stop();
processorContainers.forEach(GenericContainer::stop);
sourceContainer.stop();
}
public static abstract class Builder {
private final String streamName;
private GenericContainer source;
private GenericContainer sink;
private List<GenericContainer> processors = new LinkedList<>();
protected final GenericContainer messageBrokerContainer;
protected Builder(String streamName, GenericContainer messageBrokerContainer) {
Assert.hasText(streamName, "Stream name is required");
Assert.notNull(messageBrokerContainer, "A Message broker container is required.");
Assert.isTrue(messageBrokerContainer.isRunning(), "Message broker container must be started first.");
this.messageBrokerContainer = messageBrokerContainer;
this.streamName = streamName;
}
public Builder withSourceContainer(GenericContainer sourceContainer) {
this.source = sourceContainer;
return this;
}
public Builder withSinkContainer(GenericContainer sinkContainer) {
this.sink = sinkContainer;
return this;
}
public Builder withProcessorContainer(GenericContainer processorContainer) {
this.processors.add(processorContainer);
return this;
}
public StreamApps build() {
Assert.notNull(source, "A Source container is required.");
Assert.notNull(sink, "A Sink container is required.");
return streamAppsInstance(setupSourceContainer(), setupProcessorContainers(), setupSinkContainer());
}
protected abstract StreamApps streamAppsInstance(GenericContainer sourceContainer,
List<GenericContainer> processorContainers, GenericContainer sinkContainer);
private GenericContainer setupSourceContainer() {
return source.withNetwork(messageBrokerContainer.getNetwork())
.withEnv("SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION", sourceOutputDestination())
.withEnv(binderProperties())
.dependsOn(messageBrokerContainer);
}
private GenericContainer setupSinkContainer() {
return sink
.withNetwork(messageBrokerContainer.getNetwork())
.withEnv("SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION", sinkInputDestination())
.withEnv("SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP", streamName)
.withEnv(binderProperties())
.dependsOn(messageBrokerContainer);
}
private List<GenericContainer> setupProcessorContainers() {
IntStream.range(0, processors.size())
.forEach(i -> processors.get(i).withNetwork(messageBrokerContainer.getNetwork())
.withEnv("SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION",
i == 0 ? sourceOutputDestination() : "processor_ " + i)
.withEnv("SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION",
i == (processors.size() - 1) ? sinkInputDestination()
: "processor_" + (i + 1))
.withEnv("SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP", streamName)
.withEnv(binderProperties())
.dependsOn(messageBrokerContainer));
return processors;
}
private String sourceOutputDestination() {
return CollectionUtils.isEmpty(processors) ? streamName : "processor_0";
}
private String sinkInputDestination() {
return (CollectionUtils.isEmpty(processors) || processors.size() <= 1) ? streamName
: "processor_" + (processors.size() - 1);
}
protected abstract Map<String, String> binderProperties();
}
}

View File

@@ -0,0 +1,58 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.SocketUtils;
/**
* Support utility for stream application integration testing .
* @author David Turanski
*/
@Testcontainers
public abstract class StreamIApplicationIntegrationTestSupport {
protected static String localHostAddress() {
try {
return InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
protected static File resourceAsFile(String path) {
try {
return new ClassPathResource(path).getFile();
}
catch (IOException e) {
throw new IllegalStateException("Unable to access resource " + path);
}
}
protected static final int findAvailablePort() {
return SocketUtils.findAvailableTcpPort(10000, 20000);
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.kafka;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.StreamIApplicationIntegrationTestSupport;
/**
* Base class for stream application integration testing with Test Containers and Kafka
* binder
*/
@Testcontainers
public abstract class AbstractKafkaStreamApplicationIntegrationTests extends StreamIApplicationIntegrationTestSupport {
final static Network network = Network.SHARED;
protected final static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:5.5.1"))
.withNetwork(network);
static {
kafka.start();
}
}

View File

@@ -0,0 +1,55 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.kafka;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import org.springframework.cloud.stream.app.test.integration.StreamApps;
public class KafkaStreamApps extends StreamApps {
protected KafkaStreamApps(GenericContainer sourceContainer, List<GenericContainer> processorContainers,
GenericContainer sinkContainer) {
super(sourceContainer, processorContainers, sinkContainer);
}
public static Builder kafkaStreamApps(String streamName, GenericContainer messageBrokerContainer) {
return new KafkaBuilder(streamName, messageBrokerContainer);
}
public static final class KafkaBuilder extends Builder {
protected KafkaBuilder(String streamName, GenericContainer messageBrokerContainer) {
super(streamName, messageBrokerContainer);
}
protected Map<String, String> binderProperties() {
return Collections.singletonMap("SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS",
messageBrokerContainer.getNetworkAliases().get(0) + ":9092");
}
@Override
protected StreamApps streamAppsInstance(GenericContainer sourceContainer,
List<GenericContainer> processorContainers, GenericContainer sinkContainer) {
return new KafkaStreamApps(sourceContainer, processorContainers, sinkContainer);
}
}
}