Add Integration Test README
This commit is contained in:
@@ -5,11 +5,201 @@ This module contains common components to support stream applications.
|
||||
|
||||
## Stream Application Integration Testing
|
||||
|
||||
The `integration` package contains components supporting integration testing stream apps using TestContainers.
|
||||
The `integration` package contains components supporting integration testing stream apps using https://www.testcontainers.org/[TestContainers].
|
||||
|
||||
### StreamAppContainer
|
||||
|
||||
An extension
|
||||
An extension of https://www.testcontainers.org/features/creating_container/[GenericContainer].
|
||||
This is the base class for testing Spring Cloud Stream Application docker images.
|
||||
|
||||
Currently, this supports apps built for Kafka or RabbitMQ message brokers using `KafkaStreamAppContainer` or `RabbitMQStreamAppContainer`.
|
||||
This extension sets the binding properties for the standard `input` and `output` destinations.
|
||||
If your app uses function binding, you must configure the https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#_functional_binding_names[function destinations to map to input and output].
|
||||
Note that the pre-packaged stream applications include the required mappings already.
|
||||
|
||||
NOTE: Multiple input and output ports are currently not supported.
|
||||
|
||||
These containers depend on an already running message broker TestContainer.
|
||||
Typically, they are created in static initializers, so all tests running within a JVM use the same message broker instance.
|
||||
|
||||
For example:
|
||||
|
||||
```java
|
||||
final static Network network = Network.SHARED;
|
||||
|
||||
protected final static KafkaContainer kafka = new KafkaContainer(
|
||||
DockerImageName.parse("confluentinc/cp-kafka:5.5.1"))
|
||||
.withExposedPorts(9092, 9093)
|
||||
.withNetwork(network);
|
||||
|
||||
static {
|
||||
kafka.start();
|
||||
}
|
||||
|
||||
@Container
|
||||
static StreamAppContainer timeSource = new KafkaStreamAppContainer("springcloudstream/time-source-kafka:3.0.0-SNAPSHOT", kafka);
|
||||
```
|
||||
|
||||
The StreamAppContainer uses the shared network and connects to the message broker using `kafka.getNetworkAliases().get(0) + ":9092"`, in this case.
|
||||
|
||||
Alternately,
|
||||
|
||||
```java
|
||||
InetAddress.getLocalHost().getHostAddress() + ":" + kafka.getMappedPort(9092)
|
||||
```
|
||||
|
||||
will work for both the containers, and the host machine.
|
||||
|
||||
The host machine can connect to Kafka using the random mapped port:
|
||||
|
||||
```java
|
||||
"localhost:" + kafka.getMappedPort(9092);
|
||||
```
|
||||
|
||||
These containers are intended to work with `StreamApplicationIntegrationTestSupport`, described below.
|
||||
|
||||
### Stream Application Integration Test Support
|
||||
|
||||
link:src/main/java/org/springframework/cloud/stream/app/test/integration/StreamApplicationIntegrationTestSupport.java[StreamApplicationIntegrationTestSupport] has
|
||||
subclasses for `Kafka` and `Rabbit`, along with convenient utilities for integration testing an application image.
|
||||
The pre-packaged apps in this repository include extensive tests at the function and base application level.
|
||||
Since we automatically configure and build images using a Maven plugin, it is essential to test the built images as well.
|
||||
Also, if we, or someone in the community, observes a particular issue integrating with external components, TestContainers is
|
||||
a convenient way to try to reproduce it.
|
||||
|
||||
The test strategy is to run a single container, and use the message broker directly to publish messages to the input and
|
||||
verify the expected output message. Simply use the provided `KafkaTemplate` or `RabbitTemplate` to publish input messages to
|
||||
a `source` or `processor`.
|
||||
|
||||
### Verifying output messages.
|
||||
|
||||
This framework creates a message listener on a known output topic.
|
||||
The StreamAppContainer are statically configured to output to that topic, so the listener receives all output messages.
|
||||
Configure your test using one of the following methods;
|
||||
|
||||
```java
|
||||
protected <P> Callable<Boolean> verifyOutputPayload(Predicate<P> outputVerifier);
|
||||
Callable<Boolean> verifyOutputMessage(Predicate<Message<?>> outputVerifier);
|
||||
```
|
||||
These are `Callable<Boolean>` which happens to be the type accepted by the https://github.com/awaitility/awaitility[awaitility]
|
||||
```java
|
||||
await().until()
|
||||
```
|
||||
method, which is a convenient way to wait for the output message.
|
||||
When the message arrives, the link:src/main/java/org/springframework/cloud/stream/app/test/integration/TestTopicListener.java[TestTopicListener] implementation for the message broker will test all registered predicates.
|
||||
|
||||
let's look at simple example test for the famous `time-source`:
|
||||
|
||||
```java
|
||||
@Testcontainers
|
||||
public class KafkaTimeSourceTests extends KafkaStreamApplicationIntegrationTestSupport {
|
||||
|
||||
// "MM/dd/yy HH:mm:ss";
|
||||
private final static Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
|
||||
|
||||
static LogMatcher logMatcher = LogMatcher.contains("Started TimeSource");
|
||||
|
||||
@Container
|
||||
static StreamAppContainer timeSource = prepackagedKafkaContainerFor("time-source", VERSION)
|
||||
.withLogConsumer(logMatcher);
|
||||
@Test
|
||||
void test() {
|
||||
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
|
||||
await().atMost(DEFAULT_DURATION).until(verifyOutputPayload((String s) -> pattern.matcher(s).matches()));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
We inherit `KafkaStreamApplicationIntegrationTestSupport` which starts a `kafka` TestContainer in a static initializer.
|
||||
The Time Source emits the time every second. In this case, it's hard to know what the expected output payload is, but it should at least match the date pattern.
|
||||
This test also uses a `LogMatcher`, which is not strictly necessary, but used here to verify that the app logged the standard start up message - always a good sign.
|
||||
Then we wait for a message on the output topic that matches the pattern.
|
||||
|
||||
|
||||
NOTE: Timing concerns: The `verifyOutputPayload` is called repeatedly by awaitility. The first time, it registers the verifier with the message listener.
|
||||
Subsequently, The TopicTestListener detects that it has already been registered, so it just checks if the predicate is satisfied.
|
||||
Potential, there can be a race condition if the message is consumed before the verifier is invoked the first time.
|
||||
To address this, the `KafkaTestListener` rewinds the topic to offset 0 each time a new verifier is registered.
|
||||
RabbitMQ doesn't have this replay capability. Rabbit is no longer responsible once the consumer acknowledges the message.
|
||||
To work around this, the `RabbitMQTestListener` maintains a cache of any unverified messages for a few minutes.
|
||||
If a verifier has not been satisfied, the test listener checks the cache to see if any of those messages match.
|
||||
The following test case verifies the expected behavior.
|
||||
|
||||
```java
|
||||
@Test
|
||||
void verifierOnTheFlyOutOfOrder() {
|
||||
rabbitTemplate.convertAndSend(STREAM_APPLICATIONS_TEST_TOPIC, "#", "hello test1");
|
||||
rabbitTemplate.convertAndSend(STREAM_APPLICATIONS_TEST_TOPIC, "#", "hello test2");
|
||||
await().atMost(Duration.ofSeconds(30))
|
||||
.until(verifyOutputPayload((s -> s.equals("hello test2"))));
|
||||
await().atMost(Duration.ofSeconds(30))
|
||||
.until(verifyOutputPayload((s -> s.equals("hello test1"))));
|
||||
}
|
||||
```
|
||||
The `hello test1` verifier did not exist when `hello test1` was consumed, and is rejected by the first verifier,
|
||||
so it is cached and tested when the second verifier is created.
|
||||
|
||||
If you need to, you can register verifiers in advance, in an `@BeforeEach` method if you `@Autowire` the TestListener.
|
||||
But this doesn't work for statically declared containers which are more efficient and common with TestContainers.
|
||||
|
||||
### Testing Stream Applications
|
||||
|
||||
The @link:src/main/java/org/springframework/cloud/stream/app/test/integration/StreamApps.java[StreamApps] component
|
||||
is convenient for testing an entire stream.
|
||||
This realizes the concepts of `source`, `processor`, and `sink` , and similar Spring Cloud Data Flow, wires them up behind the scenes.
|
||||
|
||||
Here is a test for the canonical `TikTok` stream:
|
||||
|
||||
```java
|
||||
public class RabbitMQTikTokTests extends RabbitMQStreamApplicationIntegrationTestSupport {
|
||||
|
||||
private static LogMatcher logMatcher = LogMatcher.matchesRegex(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}")
|
||||
.times(3);
|
||||
|
||||
@Container
|
||||
private static final StreamApps streamApp = rabbitMQStreamApps(RabbitMQTikTokTests.class.getSimpleName(), rabbitmq)
|
||||
.withSourceContainer(prepackagedRabbitMQContainerFor("time-source", VERSION))
|
||||
.withSinkContainer(prepackagedRabbitMQContainerFor("log-sink", VERSION)
|
||||
.withLogConsumer(logMatcher))
|
||||
.build();
|
||||
|
||||
@Test
|
||||
void test() {
|
||||
await().atMost(DEFAULT_DURATION).until(logMatcher.matches());
|
||||
}
|
||||
```
|
||||
|
||||
Here, the link:src/main/java/org/springframework/cloud/stream/app/test/integration/LogMatcher.java[LogMatcher] can be declared statically since it doesn't depend on Spring beans.
|
||||
This is an extension of TestContainer's `LogConsumer` so it is created before the container starts. Here, we
|
||||
verify the LogSink logs at least 3 messages that match the pattern.
|
||||
|
||||
link:src/main/java/org/springframework/cloud/stream/app/test/integration/AppLog.java[AppLog] is also another useful LogConsumer
|
||||
to enable container logging.
|
||||
|
||||
You can find many sample tests in https://github.com/spring-cloud/spring-cloud-stream-acceptance-tests/tree/master/stream-applications-integration-tests[].
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user