Add stream-applications-integration-tests
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -25,3 +25,4 @@ dump.rdb
|
||||
.apt_generated
|
||||
artifacts
|
||||
**/dependency-reduced-pom.xml
|
||||
.idea
|
||||
|
||||
118
stream-applications-integration-tests/.mvn/wrapper/MavenWrapperDownloader.java
vendored
Normal file
118
stream-applications-integration-tests/.mvn/wrapper/MavenWrapperDownloader.java
vendored
Normal file
@@ -0,0 +1,118 @@
|
||||
/*
|
||||
* Copyright 2007-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.net.*;
|
||||
import java.io.*;
|
||||
import java.nio.channels.*;
|
||||
import java.util.Properties;
|
||||
|
||||
public class MavenWrapperDownloader {
|
||||
|
||||
private static final String WRAPPER_VERSION = "0.5.6";
|
||||
/**
|
||||
* Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
|
||||
*/
|
||||
private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
|
||||
+ WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
|
||||
|
||||
/**
|
||||
* Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
|
||||
* use instead of the default one.
|
||||
*/
|
||||
private static final String MAVEN_WRAPPER_PROPERTIES_PATH =
|
||||
".mvn/wrapper/maven-wrapper.properties";
|
||||
|
||||
/**
|
||||
* Path where the maven-wrapper.jar will be saved to.
|
||||
*/
|
||||
private static final String MAVEN_WRAPPER_JAR_PATH =
|
||||
".mvn/wrapper/maven-wrapper.jar";
|
||||
|
||||
/**
|
||||
* Name of the property which should be used to override the default download url for the wrapper.
|
||||
*/
|
||||
private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl";
|
||||
|
||||
public static void main(String args[]) {
|
||||
System.out.println("- Downloader started");
|
||||
File baseDirectory = new File(args[0]);
|
||||
System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath());
|
||||
|
||||
// If the maven-wrapper.properties exists, read it and check if it contains a custom
|
||||
// wrapperUrl parameter.
|
||||
File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH);
|
||||
String url = DEFAULT_DOWNLOAD_URL;
|
||||
if (mavenWrapperPropertyFile.exists()) {
|
||||
FileInputStream mavenWrapperPropertyFileInputStream = null;
|
||||
try {
|
||||
mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile);
|
||||
Properties mavenWrapperProperties = new Properties();
|
||||
mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream);
|
||||
url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url);
|
||||
} catch (IOException e) {
|
||||
System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'");
|
||||
} finally {
|
||||
try {
|
||||
if (mavenWrapperPropertyFileInputStream != null) {
|
||||
mavenWrapperPropertyFileInputStream.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// Ignore ...
|
||||
}
|
||||
}
|
||||
}
|
||||
System.out.println("- Downloading from: " + url);
|
||||
|
||||
File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
|
||||
if (!outputFile.getParentFile().exists()) {
|
||||
if (!outputFile.getParentFile().mkdirs()) {
|
||||
System.out.println(
|
||||
"- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
|
||||
}
|
||||
}
|
||||
System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
|
||||
try {
|
||||
downloadFileFromURL(url, outputFile);
|
||||
System.out.println("Done");
|
||||
System.exit(0);
|
||||
} catch (Throwable e) {
|
||||
System.out.println("- Error downloading");
|
||||
e.printStackTrace();
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
private static void downloadFileFromURL(String urlString, File destination) throws Exception {
|
||||
if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
|
||||
String username = System.getenv("MVNW_USERNAME");
|
||||
char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
|
||||
Authenticator.setDefault(new Authenticator() {
|
||||
@Override
|
||||
protected PasswordAuthentication getPasswordAuthentication() {
|
||||
return new PasswordAuthentication(username, password);
|
||||
}
|
||||
});
|
||||
}
|
||||
URL website = new URL(urlString);
|
||||
ReadableByteChannel rbc;
|
||||
rbc = Channels.newChannel(website.openStream());
|
||||
FileOutputStream fos = new FileOutputStream(destination);
|
||||
fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
|
||||
fos.close();
|
||||
rbc.close();
|
||||
}
|
||||
|
||||
}
|
||||
BIN
stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.jar
vendored
Normal file
BIN
stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.jar
vendored
Normal file
Binary file not shown.
2
stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.properties
vendored
Normal file
2
stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.properties
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
|
||||
wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar
|
||||
43
stream-applications-integration-tests/README.adoc
Normal file
43
stream-applications-integration-tests/README.adoc
Normal file
@@ -0,0 +1,43 @@
|
||||
= Stream Applications Integration Tests
|
||||
|
||||
This contains integration tests for pre-packaged stream-applications for Docker using https://www.testcontainers.org/[TestContainers].
|
||||
These are end-to-end integration tests running apps and required resources, using docker-compose.
|
||||
The goal is to have an end-to-end integration test for each pre-packaged application.
|
||||
We don't aim to test different configuration options, as this is the responsibility of the stream applications.
|
||||
One of the major benefits is to verify the built Docker images run correctly, especially when we introduce global changes,
|
||||
such as upgrading the base JDK image, or other pervasive changes.
|
||||
|
||||
== Test Strategy
|
||||
|
||||
Each test uses a docker-compose file to configure a simple pipeline including the source, sink, or processor app under test.
|
||||
Usually, a single message is sufficient.
|
||||
|
||||
The tests use following patterns:
|
||||
|
||||
=== Source
|
||||
To test a source, we use a log sink and add a LogConsumer that returns a boolean value, eventually evaluating to `true`
|
||||
when an expected regex match is detected. Then we create an event to trigger the source.
|
||||
|
||||
=== Sink
|
||||
To test a sink, we use the http source and post a message using WebClient.
|
||||
|
||||
=== Processor
|
||||
To test a processor, use an http source and a log sink.
|
||||
|
||||
Typically, it is convenient to run the required external resource in a separate container, since it must be accessed by localhost(to provide data to a source or
|
||||
verify data created by a sink), and by the containerized applications.
|
||||
|
||||
=== Organization and Conventions
|
||||
`processor`, `source`, and `sink` are leaf packages.
|
||||
The docker-compose YAML corresponding each test are test resources, similarly organized.
|
||||
So `.../source/JdbcSinkTests` corresponds to `source/jdbc-source-tests.yml`.
|
||||
|
||||
=== Templating
|
||||
The docker-compose files are Mustache templates to allow configuration of runtime values, such as ports.
|
||||
The Docker tag is required configuration, so the templates must always be processed.
|
||||
The result is copied to a temporary file.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
310
stream-applications-integration-tests/mvnw
vendored
Executable file
310
stream-applications-integration-tests/mvnw
vendored
Executable file
@@ -0,0 +1,310 @@
|
||||
#!/bin/sh
|
||||
# ----------------------------------------------------------------------------
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Maven Start Up Batch script
|
||||
#
|
||||
# Required ENV vars:
|
||||
# ------------------
|
||||
# JAVA_HOME - location of a JDK home dir
|
||||
#
|
||||
# Optional ENV vars
|
||||
# -----------------
|
||||
# M2_HOME - location of maven2's installed home dir
|
||||
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
# e.g. to debug Maven itself, use
|
||||
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
if [ -z "$MAVEN_SKIP_RC" ] ; then
|
||||
|
||||
if [ -f /etc/mavenrc ] ; then
|
||||
. /etc/mavenrc
|
||||
fi
|
||||
|
||||
if [ -f "$HOME/.mavenrc" ] ; then
|
||||
. "$HOME/.mavenrc"
|
||||
fi
|
||||
|
||||
fi
|
||||
|
||||
# OS specific support. $var _must_ be set to either true or false.
|
||||
cygwin=false;
|
||||
darwin=false;
|
||||
mingw=false
|
||||
case "`uname`" in
|
||||
CYGWIN*) cygwin=true ;;
|
||||
MINGW*) mingw=true;;
|
||||
Darwin*) darwin=true
|
||||
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
|
||||
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
if [ -x "/usr/libexec/java_home" ]; then
|
||||
export JAVA_HOME="`/usr/libexec/java_home`"
|
||||
else
|
||||
export JAVA_HOME="/Library/Java/Home"
|
||||
fi
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
|
||||
if [ -z "$JAVA_HOME" ] ; then
|
||||
if [ -r /etc/gentoo-release ] ; then
|
||||
JAVA_HOME=`java-config --jre-home`
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$M2_HOME" ] ; then
|
||||
## resolve links - $0 may be a link to maven's home
|
||||
PRG="$0"
|
||||
|
||||
# need this for relative symlinks
|
||||
while [ -h "$PRG" ] ; do
|
||||
ls=`ls -ld "$PRG"`
|
||||
link=`expr "$ls" : '.*-> \(.*\)$'`
|
||||
if expr "$link" : '/.*' > /dev/null; then
|
||||
PRG="$link"
|
||||
else
|
||||
PRG="`dirname "$PRG"`/$link"
|
||||
fi
|
||||
done
|
||||
|
||||
saveddir=`pwd`
|
||||
|
||||
M2_HOME=`dirname "$PRG"`/..
|
||||
|
||||
# make it fully qualified
|
||||
M2_HOME=`cd "$M2_HOME" && pwd`
|
||||
|
||||
cd "$saveddir"
|
||||
# echo Using m2 at $M2_HOME
|
||||
fi
|
||||
|
||||
# For Cygwin, ensure paths are in UNIX format before anything is touched
|
||||
if $cygwin ; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME=`cygpath --unix "$M2_HOME"`
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
|
||||
[ -n "$CLASSPATH" ] &&
|
||||
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
|
||||
fi
|
||||
|
||||
# For Mingw, ensure paths are in UNIX format before anything is touched
|
||||
if $mingw ; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME="`(cd "$M2_HOME"; pwd)`"
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
javaExecutable="`which javac`"
|
||||
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
|
||||
# readlink(1) is not available as standard on Solaris 10.
|
||||
readLink=`which readlink`
|
||||
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
|
||||
if $darwin ; then
|
||||
javaHome="`dirname \"$javaExecutable\"`"
|
||||
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
|
||||
else
|
||||
javaExecutable="`readlink -f \"$javaExecutable\"`"
|
||||
fi
|
||||
javaHome="`dirname \"$javaExecutable\"`"
|
||||
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
|
||||
JAVA_HOME="$javaHome"
|
||||
export JAVA_HOME
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$JAVACMD" ] ; then
|
||||
if [ -n "$JAVA_HOME" ] ; then
|
||||
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
|
||||
# IBM's JDK on AIX uses strange locations for the executables
|
||||
JAVACMD="$JAVA_HOME/jre/sh/java"
|
||||
else
|
||||
JAVACMD="$JAVA_HOME/bin/java"
|
||||
fi
|
||||
else
|
||||
JAVACMD="`which java`"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ ! -x "$JAVACMD" ] ; then
|
||||
echo "Error: JAVA_HOME is not defined correctly." >&2
|
||||
echo " We cannot execute $JAVACMD" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ] ; then
|
||||
echo "Warning: JAVA_HOME environment variable is not set."
|
||||
fi
|
||||
|
||||
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
|
||||
|
||||
# traverses directory structure from process work directory to filesystem root
|
||||
# first directory with .mvn subdirectory is considered project base directory
|
||||
find_maven_basedir() {
|
||||
|
||||
if [ -z "$1" ]
|
||||
then
|
||||
echo "Path not specified to find_maven_basedir"
|
||||
return 1
|
||||
fi
|
||||
|
||||
basedir="$1"
|
||||
wdir="$1"
|
||||
while [ "$wdir" != '/' ] ; do
|
||||
if [ -d "$wdir"/.mvn ] ; then
|
||||
basedir=$wdir
|
||||
break
|
||||
fi
|
||||
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
|
||||
if [ -d "${wdir}" ]; then
|
||||
wdir=`cd "$wdir/.."; pwd`
|
||||
fi
|
||||
# end of workaround
|
||||
done
|
||||
echo "${basedir}"
|
||||
}
|
||||
|
||||
# concatenates all lines of a file
|
||||
concat_lines() {
|
||||
if [ -f "$1" ]; then
|
||||
echo "$(tr -s '\n' ' ' < "$1")"
|
||||
fi
|
||||
}
|
||||
|
||||
BASE_DIR=`find_maven_basedir "$(pwd)"`
|
||||
if [ -z "$BASE_DIR" ]; then
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
##########################################################################################
|
||||
# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
|
||||
# This allows using the maven wrapper in projects that prohibit checking in binary data.
|
||||
##########################################################################################
|
||||
if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Found .mvn/wrapper/maven-wrapper.jar"
|
||||
fi
|
||||
else
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
|
||||
fi
|
||||
if [ -n "$MVNW_REPOURL" ]; then
|
||||
jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
else
|
||||
jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
fi
|
||||
while IFS="=" read key value; do
|
||||
case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
|
||||
esac
|
||||
done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Downloading from: $jarUrl"
|
||||
fi
|
||||
wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
|
||||
if $cygwin; then
|
||||
wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
|
||||
fi
|
||||
|
||||
if command -v wget > /dev/null; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Found wget ... using wget"
|
||||
fi
|
||||
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
|
||||
wget "$jarUrl" -O "$wrapperJarPath"
|
||||
else
|
||||
wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
|
||||
fi
|
||||
elif command -v curl > /dev/null; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Found curl ... using curl"
|
||||
fi
|
||||
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
|
||||
curl -o "$wrapperJarPath" "$jarUrl" -f
|
||||
else
|
||||
curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
|
||||
fi
|
||||
|
||||
else
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Falling back to using Java to download"
|
||||
fi
|
||||
javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
|
||||
# For Cygwin, switch paths to Windows format before running javac
|
||||
if $cygwin; then
|
||||
javaClass=`cygpath --path --windows "$javaClass"`
|
||||
fi
|
||||
if [ -e "$javaClass" ]; then
|
||||
if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo " - Compiling MavenWrapperDownloader.java ..."
|
||||
fi
|
||||
# Compiling the Java class
|
||||
("$JAVA_HOME/bin/javac" "$javaClass")
|
||||
fi
|
||||
if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
|
||||
# Running the downloader
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo " - Running MavenWrapperDownloader.java ..."
|
||||
fi
|
||||
("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
##########################################################################################
|
||||
# End of extension
|
||||
##########################################################################################
|
||||
|
||||
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo $MAVEN_PROJECTBASEDIR
|
||||
fi
|
||||
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
|
||||
|
||||
# For Cygwin, switch paths to Windows format before running java
|
||||
if $cygwin; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME=`cygpath --path --windows "$M2_HOME"`
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
|
||||
[ -n "$CLASSPATH" ] &&
|
||||
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
|
||||
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
|
||||
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
|
||||
fi
|
||||
|
||||
# Provide a "standardized" way to retrieve the CLI args that will
|
||||
# work with both Windows and non-Windows executions.
|
||||
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
|
||||
export MAVEN_CMD_LINE_ARGS
|
||||
|
||||
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
exec "$JAVACMD" \
|
||||
$MAVEN_OPTS \
|
||||
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
|
||||
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
|
||||
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
|
||||
182
stream-applications-integration-tests/mvnw.cmd
vendored
Normal file
182
stream-applications-integration-tests/mvnw.cmd
vendored
Normal file
@@ -0,0 +1,182 @@
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Licensed to the Apache Software Foundation (ASF) under one
|
||||
@REM or more contributor license agreements. See the NOTICE file
|
||||
@REM distributed with this work for additional information
|
||||
@REM regarding copyright ownership. The ASF licenses this file
|
||||
@REM to you under the Apache License, Version 2.0 (the
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@REM KIND, either express or implied. See the License for the
|
||||
@REM specific language governing permissions and limitations
|
||||
@REM under the License.
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Maven Start Up Batch script
|
||||
@REM
|
||||
@REM Required ENV vars:
|
||||
@REM JAVA_HOME - location of a JDK home dir
|
||||
@REM
|
||||
@REM Optional ENV vars
|
||||
@REM M2_HOME - location of maven2's installed home dir
|
||||
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
|
||||
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
|
||||
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
@REM e.g. to debug Maven itself, use
|
||||
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
|
||||
@echo off
|
||||
@REM set title of command window
|
||||
title %0
|
||||
@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
|
||||
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
|
||||
|
||||
@REM set %HOME% to equivalent of $HOME
|
||||
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
|
||||
|
||||
@REM Execute a user defined script before this one
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
|
||||
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
|
||||
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
|
||||
:skipRcPre
|
||||
|
||||
@setlocal
|
||||
|
||||
set ERROR_CODE=0
|
||||
|
||||
@REM To isolate internal variables from possible post scripts, we use another setlocal
|
||||
@setlocal
|
||||
|
||||
@REM ==== START VALIDATION ====
|
||||
if not "%JAVA_HOME%" == "" goto OkJHome
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME not found in your environment. >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
:OkJHome
|
||||
if exist "%JAVA_HOME%\bin\java.exe" goto init
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME is set to an invalid directory. >&2
|
||||
echo JAVA_HOME = "%JAVA_HOME%" >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
@REM ==== END VALIDATION ====
|
||||
|
||||
:init
|
||||
|
||||
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
|
||||
@REM Fallback to current working directory if not found.
|
||||
|
||||
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
|
||||
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
|
||||
|
||||
set EXEC_DIR=%CD%
|
||||
set WDIR=%EXEC_DIR%
|
||||
:findBaseDir
|
||||
IF EXIST "%WDIR%"\.mvn goto baseDirFound
|
||||
cd ..
|
||||
IF "%WDIR%"=="%CD%" goto baseDirNotFound
|
||||
set WDIR=%CD%
|
||||
goto findBaseDir
|
||||
|
||||
:baseDirFound
|
||||
set MAVEN_PROJECTBASEDIR=%WDIR%
|
||||
cd "%EXEC_DIR%"
|
||||
goto endDetectBaseDir
|
||||
|
||||
:baseDirNotFound
|
||||
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
|
||||
cd "%EXEC_DIR%"
|
||||
|
||||
:endDetectBaseDir
|
||||
|
||||
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
|
||||
|
||||
@setlocal EnableExtensions EnableDelayedExpansion
|
||||
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
|
||||
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
|
||||
|
||||
:endReadAdditionalConfig
|
||||
|
||||
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
|
||||
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
|
||||
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
|
||||
FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
|
||||
IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
|
||||
)
|
||||
|
||||
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
|
||||
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
|
||||
if exist %WRAPPER_JAR% (
|
||||
if "%MVNW_VERBOSE%" == "true" (
|
||||
echo Found %WRAPPER_JAR%
|
||||
)
|
||||
) else (
|
||||
if not "%MVNW_REPOURL%" == "" (
|
||||
SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
)
|
||||
if "%MVNW_VERBOSE%" == "true" (
|
||||
echo Couldn't find %WRAPPER_JAR%, downloading it ...
|
||||
echo Downloading from: %DOWNLOAD_URL%
|
||||
)
|
||||
|
||||
powershell -Command "&{"^
|
||||
"$webclient = new-object System.Net.WebClient;"^
|
||||
"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
|
||||
"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
|
||||
"}"^
|
||||
"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
|
||||
"}"
|
||||
if "%MVNW_VERBOSE%" == "true" (
|
||||
echo Finished downloading %WRAPPER_JAR%
|
||||
)
|
||||
)
|
||||
@REM End of extension
|
||||
|
||||
@REM Provide a "standardized" way to retrieve the CLI args that will
|
||||
@REM work with both Windows and non-Windows executions.
|
||||
set MAVEN_CMD_LINE_ARGS=%*
|
||||
|
||||
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
|
||||
if ERRORLEVEL 1 goto error
|
||||
goto end
|
||||
|
||||
:error
|
||||
set ERROR_CODE=1
|
||||
|
||||
:end
|
||||
@endlocal & set ERROR_CODE=%ERROR_CODE%
|
||||
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
|
||||
@REM check for post script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
|
||||
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
|
||||
:skipRcPost
|
||||
|
||||
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
|
||||
if "%MAVEN_BATCH_PAUSE%" == "on" pause
|
||||
|
||||
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
|
||||
|
||||
exit /B %ERROR_CODE%
|
||||
216
stream-applications-integration-tests/pom.xml
Normal file
216
stream-applications-integration-tests/pom.xml
Normal file
@@ -0,0 +1,216 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>2.3.3.RELEASE</version>
|
||||
<relativePath/> <!-- lookup parent from repository -->
|
||||
</parent>
|
||||
<groupId>org.springframework.cloud.stream.apps</groupId>
|
||||
<artifactId>demo</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<name>stream-applications-integration-tests</name>
|
||||
<description>Integration Tests for stream applications</description>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<java-functions.version>1.0.0-SNAPSHOT</java-functions.version>
|
||||
<mariadb-client.version>2.6.2</mariadb-client.version>
|
||||
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
|
||||
<testcontainers.version>1.14.3</testcontainers.version>
|
||||
<jmustache.version>1.15</jmustache.version>
|
||||
<maven-checkstyle-plugin.version>3.1.0</maven-checkstyle-plugin.version>
|
||||
<disable.checks>false</disable.checks>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failOnViolation>true</maven-checkstyle-plugin.failOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
<puppycrawl-tools-checkstyle.version>8.29</puppycrawl-tools-checkstyle.version>
|
||||
<checkstyle.location>https://raw.githubusercontent.com/spring-cloud/stream-applications/master/etc/checkstyle
|
||||
</checkstyle.location>
|
||||
<checkstyle.suppressions.file>
|
||||
${checkstyle.location}/checkstyle-suppressions.xml
|
||||
</checkstyle.suppressions.file>
|
||||
<checkstyle.nohttp.file>
|
||||
${checkstyle.location}/nohttp-checkstyle.xml
|
||||
</checkstyle.nohttp.file>
|
||||
<checkstyle.additional.suppressions.file>${checkstyle.location}/checkstyle-suppressions.xml
|
||||
</checkstyle.additional.suppressions.file>
|
||||
<nohttp-checkstyle.version>0.0.2.RELEASE</nohttp-checkstyle.version>
|
||||
<disable.nohttp.checks>true</disable.nohttp.checks>
|
||||
<spring-javaformat-checkstyle.version>0.0.7</spring-javaformat-checkstyle.version>
|
||||
<wiremock.version>2.27.1</wiremock.version>
|
||||
<aws.version>1.11.415</aws.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.junit.vintage</groupId>
|
||||
<artifactId>junit-vintage-engine</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>junit-jupiter</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>mariadb</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>mongodb</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud.fn</groupId>
|
||||
<artifactId>function-test-support</artifactId>
|
||||
<version>${java-functions.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-geode</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk-s3</artifactId>
|
||||
<version>${aws.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>mockwebserver</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-mongodb</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mariadb.jdbc</groupId>
|
||||
<artifactId>mariadb-java-client</artifactId>
|
||||
<version>${mariadb-client.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-jdbc</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.samskivert</groupId>
|
||||
<artifactId>jmustache</artifactId>
|
||||
<version>${jmustache.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>${maven-checkstyle-plugin.version}</version>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.puppycrawl.tools</groupId>
|
||||
<artifactId>checkstyle</artifactId>
|
||||
<version>${puppycrawl-tools-checkstyle.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.spring.javaformat</groupId>
|
||||
<artifactId>spring-javaformat-checkstyle</artifactId>
|
||||
<version>${spring-javaformat-checkstyle.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.spring.nohttp</groupId>
|
||||
<artifactId>nohttp-checkstyle</artifactId>
|
||||
<version>${nohttp-checkstyle.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>checkstyle-validation</id>
|
||||
<phase>validate</phase>
|
||||
<inherited>true</inherited>
|
||||
<configuration>
|
||||
<skip>${disable.checks}</skip>
|
||||
<configLocation>${checkstyle.location}/checkstyle.xml</configLocation>
|
||||
<headerLocation>${checkstyle.location}/checkstyle-header.txt</headerLocation>
|
||||
<propertyExpansion>
|
||||
checkstyle.build.directory=${project.build.directory}
|
||||
checkstyle.suppressions.file=${checkstyle.suppressions.file}
|
||||
checkstyle.additional.suppressions.file=${checkstyle.additional.suppressions.file}
|
||||
</propertyExpansion>
|
||||
<consoleOutput>true</consoleOutput>
|
||||
<includeTestSourceDirectory>
|
||||
${maven-checkstyle-plugin.includeTestSourceDirectory}
|
||||
</includeTestSourceDirectory>
|
||||
<failsOnError>${maven-checkstyle-plugin.failsOnError}
|
||||
</failsOnError>
|
||||
<failOnViolation>
|
||||
${maven-checkstyle-plugin.failOnViolation}
|
||||
</failOnViolation>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>no-http-checkstyle-validation</id>
|
||||
<phase>validate</phase>
|
||||
<inherited>true</inherited>
|
||||
<configuration>
|
||||
<skip>${disable.nohttp.checks}</skip>
|
||||
<configLocation>${checkstyle.nohttp.file}</configLocation>
|
||||
<includes>**/*</includes>
|
||||
<excludes>**/.idea/**/*,**/.git/**/*,**/target/**/*,**/*.log</excludes>
|
||||
<sourceDirectories>./</sourceDirectories>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>testcontainers-bom</artifactId>
|
||||
<version>${testcontainers.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.samskivert.mustache.Mustache;
|
||||
import com.samskivert.mustache.Template;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.util.SocketUtils;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
@Testcontainers
|
||||
public abstract class AbstractStreamApplicationTests {
|
||||
|
||||
protected final static String STREAM_APPS_VERSION = "3.0.0-SNAPSHOT";
|
||||
|
||||
public static final String STREAM_APPS_VERSION_KEY = "stream.apps.version";
|
||||
|
||||
protected static Path tempDir;
|
||||
|
||||
protected static File kafka() {
|
||||
return resourceAsFile("compose-kafka.yml");
|
||||
}
|
||||
|
||||
protected static File resourceAsFile(String path) {
|
||||
try {
|
||||
return new ClassPathResource(path).getFile();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new IllegalStateException("Unable to access resource " + path);
|
||||
}
|
||||
}
|
||||
|
||||
protected static String localHostAddress() {
|
||||
try {
|
||||
return InetAddress.getLocalHost().getHostAddress();
|
||||
}
|
||||
catch (UnknownHostException e) {
|
||||
throw new IllegalStateException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private static WebClient webClient = WebClient.builder().build();
|
||||
|
||||
protected static WebClient webClient() {
|
||||
return webClient;
|
||||
}
|
||||
|
||||
// Junit TempDir does not work with DockerComposeContainer unless you mount it.
|
||||
// Also doesn't work as @BeforeAll in this case.
|
||||
static void initializeTempDir() throws IOException {
|
||||
Path tempRoot = Paths.get(new ClassPathResource("/").getFile().getAbsolutePath());
|
||||
if (tempDir == null) {
|
||||
tempDir = Files.createTempDirectory(tempRoot, UUID.randomUUID().toString());
|
||||
tempDir.toFile().deleteOnExit();
|
||||
}
|
||||
}
|
||||
|
||||
protected static int findAvailablePort() {
|
||||
return SocketUtils.findAvailableTcpPort(10000, 20000);
|
||||
}
|
||||
|
||||
protected static File resolveTemplate(String templatePath, Map<String, Object> templateProperties) {
|
||||
try {
|
||||
initializeTempDir();
|
||||
try (InputStreamReader resourcesTemplateReader = new InputStreamReader(
|
||||
Objects.requireNonNull(new ClassPathResource(templatePath).getInputStream()))) {
|
||||
Template resourceTemplate = Mustache.compiler().escapeHTML(false).compile(resourcesTemplateReader);
|
||||
Path temporaryFile = Files.createFile(tempDir.resolve(Paths.get(templatePath).getFileName()));
|
||||
Files.write(temporaryFile,
|
||||
resourceTemplate.execute(addGlobalProperties(templateProperties)).getBytes()).toFile();
|
||||
temporaryFile.toFile().deleteOnExit();
|
||||
return temporaryFile.toFile();
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new IllegalStateException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Map<String, Object> addGlobalProperties(Map<String, Object> templateProperties) {
|
||||
if (templateProperties.containsKey(STREAM_APPS_VERSION)) {
|
||||
return templateProperties;
|
||||
}
|
||||
Map<String, Object> enriched = new HashMap<>(templateProperties);
|
||||
enriched.put(STREAM_APPS_VERSION_KEY, STREAM_APPS_VERSION);
|
||||
return enriched;
|
||||
}
|
||||
|
||||
public static class AppLog extends Slf4jLogConsumer {
|
||||
public static AppLog appLog(String appName) {
|
||||
return new AppLog(appName);
|
||||
}
|
||||
|
||||
AppLog(String appName) {
|
||||
super(LoggerFactory.getLogger(appName));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
public class FluentMap<K, V> extends LinkedHashMap<K, V> {
|
||||
public static FluentMap<String, Object> fluentMap() {
|
||||
return new FluentMap<>();
|
||||
}
|
||||
|
||||
public FluentMap<K, V> withEntry(K key, V value) {
|
||||
put(key, value);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.output.OutputFrame;
|
||||
|
||||
public class LogMatcher implements Consumer<OutputFrame> {
|
||||
private static Logger logger = LoggerFactory.getLogger(LogMatcher.class);
|
||||
|
||||
private List<Consumer<String>> listeners = new LinkedList<>();
|
||||
|
||||
public static String contains(String string) {
|
||||
return ".*" + string + ".*";
|
||||
}
|
||||
|
||||
public static String endsWith(String string) {
|
||||
return ".*" + string;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(OutputFrame outputFrame) {
|
||||
listeners.forEach(m -> m.accept(outputFrame.getUtf8String()));
|
||||
}
|
||||
|
||||
public LogListener withRegex(String regex) {
|
||||
LogListener logListener = new LogListener(regex);
|
||||
listeners.add(logListener);
|
||||
return logListener;
|
||||
}
|
||||
|
||||
public class LogListener implements Consumer<String> {
|
||||
private AtomicBoolean matched = new AtomicBoolean();
|
||||
|
||||
private final Pattern pattern;
|
||||
|
||||
LogListener(String regex) {
|
||||
pattern = Pattern.compile(regex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(String s) {
|
||||
logger.trace(this + "matching " + s.trim() + " using pattern " + pattern.pattern());
|
||||
if (pattern.matcher(s.trim()).matches()) {
|
||||
logger.debug(" MATCHED " + s.trim());
|
||||
matched.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
public AtomicBoolean matches() {
|
||||
return matched;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.DockerComposeContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatCode;
|
||||
|
||||
public class TickTockTests extends AbstractStreamApplicationTests {
|
||||
// "MM/dd/yy HH:mm:ss";
|
||||
private final Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}");
|
||||
|
||||
@Container
|
||||
private final DockerComposeContainer environment = new DockerComposeContainer(
|
||||
kafka(),
|
||||
resolveTemplate("tick-tock-tests.yml", Collections.EMPTY_MAP));
|
||||
|
||||
@Test
|
||||
void ticktock() {
|
||||
assertThatCode(() -> environment.waitingFor("log-sink", Wait.forLogMessage(pattern.pattern(), 5)
|
||||
.withStartupTimeout(Duration.ofMinutes(2)))).doesNotThrowAnyException();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.processor;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.time.Duration;
|
||||
|
||||
import okhttp3.mockwebserver.Dispatcher;
|
||||
import okhttp3.mockwebserver.MockResponse;
|
||||
import okhttp3.mockwebserver.MockWebServer;
|
||||
import okhttp3.mockwebserver.RecordedRequest;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.DockerComposeContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests;
|
||||
import org.springframework.cloud.stream.apps.integration.test.LogMatcher;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.reactive.function.client.ClientResponse;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap;
|
||||
|
||||
public class HttpRequestProcessorTests extends AbstractStreamApplicationTests {
|
||||
private static MockWebServer server = new MockWebServer();
|
||||
|
||||
private static int serverPort = findAvailablePort();
|
||||
|
||||
private static String url = "http://" + localHostAddress() + ":" + serverPort;
|
||||
|
||||
private static int sourcePort = findAvailablePort();
|
||||
|
||||
private static LogMatcher logMatcher = new LogMatcher();
|
||||
|
||||
@Container
|
||||
private static final DockerComposeContainer environment = new DockerComposeContainer(
|
||||
kafka(),
|
||||
resolveTemplate("processor/http-request-processor-tests.yml", fluentMap()
|
||||
.withEntry("port", sourcePort)
|
||||
.withEntry("url", url)))
|
||||
.withLogConsumer("log-sink", appLog("log-sink"))
|
||||
.withLogConsumer("log-sink", logMatcher)
|
||||
.withExposedService("http-source", sourcePort,
|
||||
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
|
||||
|
||||
@BeforeAll
|
||||
static void startServer() throws Exception {
|
||||
server.start(InetAddress.getLocalHost(), serverPort);
|
||||
}
|
||||
|
||||
@Test
|
||||
void get() {
|
||||
server.setDispatcher(new Dispatcher() {
|
||||
@Override
|
||||
public MockResponse dispatch(RecordedRequest recordedRequest) {
|
||||
return new MockResponse().setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
|
||||
.setBody("{\"response\":\"" + recordedRequest.getBody().readUtf8() + "\"}")
|
||||
.setResponseCode(HttpStatus.OK.value());
|
||||
}
|
||||
});
|
||||
ClientResponse response = webClient()
|
||||
.post()
|
||||
.uri("http://localhost:" + sourcePort)
|
||||
.contentType(MediaType.TEXT_PLAIN)
|
||||
.body(Mono.just("ping"), String.class)
|
||||
.exchange()
|
||||
.block();
|
||||
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
|
||||
|
||||
await().atMost(Duration.ofSeconds(30))
|
||||
.untilTrue(logMatcher.withRegex(".*\\{\"response\":\"ping\"\\}").matches());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.sink;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.DockerComposeContainer;
|
||||
import org.testcontainers.containers.MariaDBContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.web.reactive.function.client.ClientResponse;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap;
|
||||
|
||||
public class JdbcSinkTests extends AbstractStreamApplicationTests {
|
||||
|
||||
private static int port = findAvailablePort();
|
||||
|
||||
private static JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Container
|
||||
private static MariaDBContainer mariadbContainer = (MariaDBContainer) new MariaDBContainer()
|
||||
.withDatabaseName("test")
|
||||
.withPassword("password")
|
||||
.withUsername("user")
|
||||
.withInitScript("init.sql")
|
||||
.withExposedPorts(3306)
|
||||
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
|
||||
|
||||
@Container
|
||||
private DockerComposeContainer environment = new DockerComposeContainer(
|
||||
kafka(),
|
||||
resolveTemplate("sink/jdbc-sink-tests.yml", fluentMap()
|
||||
.withEntry("jdbc.url",
|
||||
mariadbContainer.getJdbcUrl().replace("localhost",
|
||||
localHostAddress()))
|
||||
.withEntry("user", mariadbContainer.getUsername())
|
||||
.withEntry("password", mariadbContainer.getPassword())
|
||||
.withEntry("port", port)))
|
||||
.withLogConsumer("jdbc-sink", appLog("jdbc-sink"))
|
||||
.withExposedService("http-source", port,
|
||||
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
|
||||
|
||||
@BeforeAll
|
||||
static void buildJdbcTemplate() {
|
||||
HikariDataSource dataSource = new HikariDataSource();
|
||||
dataSource.setDriverClassName(mariadbContainer.getDriverClassName());
|
||||
dataSource.setUsername(mariadbContainer.getUsername());
|
||||
dataSource.setPassword(mariadbContainer.getPassword());
|
||||
dataSource.setJdbcUrl(mariadbContainer.getJdbcUrl());
|
||||
jdbcTemplate = new JdbcTemplate(dataSource);
|
||||
jdbcTemplate.execute("DELETE FROM People");
|
||||
}
|
||||
|
||||
@Test
|
||||
void postData() {
|
||||
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\": \"Narrow Alley\"}}";
|
||||
ClientResponse response = webClient()
|
||||
.post()
|
||||
.uri("http://localhost:" + port)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.body(Mono.just(json), String.class)
|
||||
.exchange()
|
||||
.block();
|
||||
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
|
||||
|
||||
await().atMost(Duration.ofSeconds(30))
|
||||
.untilAsserted(
|
||||
() -> assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class))
|
||||
.isOne());
|
||||
assertThat(jdbcTemplate.queryForObject("SELECT name from People", String.class)).isEqualTo("My Name");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.sink;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.DockerComposeContainer;
|
||||
import org.testcontainers.containers.MongoDBContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests;
|
||||
import org.springframework.data.mongodb.MongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
||||
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.reactive.function.client.ClientResponse;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap;
|
||||
|
||||
public class MongoDBSinkTests extends AbstractStreamApplicationTests {
|
||||
|
||||
private static int port = findAvailablePort();
|
||||
|
||||
private static MongoTemplate mongoTemplate;
|
||||
|
||||
@Container
|
||||
private static MongoDBContainer mongoDBContainer = new MongoDBContainer()
|
||||
.withExposedPorts(27017)
|
||||
.withStartupTimeout(Duration.ofMinutes(2));
|
||||
|
||||
private static String mongoConnectionString() {
|
||||
return String.format("mongodb://%s:%s/%s", localHostAddress(), mongoDBContainer.getMappedPort(27017), "test");
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
private static void buildMongoTemplate() {
|
||||
MongoDatabaseFactory mongoDatabaseFactory = new SimpleMongoClientDatabaseFactory(
|
||||
mongoConnectionString());
|
||||
mongoTemplate = new MongoTemplate(mongoDatabaseFactory);
|
||||
}
|
||||
|
||||
@Container
|
||||
private DockerComposeContainer environment = new DockerComposeContainer(
|
||||
kafka(),
|
||||
resolveTemplate("sink/mongodb-sink-tests.yml", fluentMap()
|
||||
.withEntry("mongodb.url", mongoConnectionString())
|
||||
.withEntry("port", port)))
|
||||
.withLogConsumer("jdbc-sink",
|
||||
appLog("jdbc-sink"))
|
||||
.withExposedService("http-source", port,
|
||||
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
|
||||
|
||||
@Test
|
||||
void postData() {
|
||||
String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\": \"Narrow Alley\"}}";
|
||||
ClientResponse response = webClient()
|
||||
.post()
|
||||
.uri("http://localhost:" + port)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.body(Mono.just(json), String.class)
|
||||
.exchange()
|
||||
.block(Duration.ofSeconds(30));
|
||||
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
|
||||
List<Document> docs = mongoTemplate.findAll(Document.class, "test");
|
||||
assertThat(docs).allMatch(document -> document.get("name", String.class).equals("My Name"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.sink;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.InetAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.DockerComposeContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.reactive.function.client.ClientResponse;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap;
|
||||
|
||||
public class TcpSinkTests extends AbstractStreamApplicationTests {
|
||||
|
||||
private static final int port = findAvailablePort();
|
||||
|
||||
private static final int tcpPort = findAvailablePort();
|
||||
|
||||
private static Socket socket;
|
||||
|
||||
private static final AtomicBoolean socketReady = new AtomicBoolean();
|
||||
|
||||
@Container
|
||||
private static final DockerComposeContainer environment = new DockerComposeContainer(
|
||||
kafka(),
|
||||
resolveTemplate("sink/tcp-sink-tests.yml", fluentMap()
|
||||
.withEntry("port", port)
|
||||
.withEntry("tcp.port", tcpPort)
|
||||
.withEntry("tcp.host", localHostAddress())))
|
||||
.withLogConsumer("tcp-sink", appLog("tcp-sink"))
|
||||
.withExposedService("http-source", port,
|
||||
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
|
||||
|
||||
@BeforeAll
|
||||
static void startTcpServer() {
|
||||
new Thread(() -> {
|
||||
try {
|
||||
socket = new ServerSocket(tcpPort, 50, InetAddress.getLocalHost()).accept();
|
||||
socketReady.set(true);
|
||||
}
|
||||
catch (IOException exception) {
|
||||
exception.printStackTrace();
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
|
||||
@Test
|
||||
void postData() throws IOException {
|
||||
String text = "Hello, world!";
|
||||
ClientResponse response = webClient()
|
||||
.post()
|
||||
.uri("http://localhost:" + port)
|
||||
.contentType(MediaType.TEXT_PLAIN)
|
||||
.body(Mono.just(text), String.class)
|
||||
.exchange()
|
||||
.block();
|
||||
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
|
||||
await().atMost(Duration.ofSeconds(10)).untilTrue(socketReady);
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
|
||||
await().atMost(Duration.ofSeconds(10)).until(() -> reader.readLine().equals("Hello, world!"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,110 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.github.dockerjava.api.command.CreateContainerCmd;
|
||||
import com.github.dockerjava.api.model.ExposedPort;
|
||||
import com.github.dockerjava.api.model.HostConfig;
|
||||
import com.github.dockerjava.api.model.PortBinding;
|
||||
import com.github.dockerjava.api.model.Ports;
|
||||
import org.apache.geode.cache.Region;
|
||||
import org.apache.geode.cache.client.ClientCache;
|
||||
import org.apache.geode.cache.client.ClientCacheFactory;
|
||||
import org.apache.geode.cache.client.ClientRegionShortcut;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.DockerComposeContainer;
|
||||
import org.testcontainers.images.builder.ImageFromDockerfile;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.fn.test.support.geode.GeodeContainer;
|
||||
import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests;
|
||||
import org.springframework.cloud.stream.apps.integration.test.LogMatcher;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap;
|
||||
|
||||
public class GeodeSourceTests extends AbstractStreamApplicationTests {
|
||||
|
||||
private static LogMatcher logMatcher = new LogMatcher();
|
||||
|
||||
private static LogMatcher geodeLogMatcher = new LogMatcher();
|
||||
|
||||
private static int locatorPort = findAvailablePort();
|
||||
|
||||
private static int cacheServerPort = findAvailablePort();
|
||||
|
||||
private static Region<Object, Object> clientRegion;
|
||||
|
||||
@Container
|
||||
private static GeodeContainer geode = (GeodeContainer) new GeodeContainer(new ImageFromDockerfile()
|
||||
.withFileFromClasspath("Dockerfile", "geode/Dockerfile")
|
||||
.withBuildArg("CACHE_SERVER_PORT", String.valueOf(cacheServerPort))
|
||||
.withBuildArg("LOCATOR_PORT", String.valueOf(locatorPort)),
|
||||
locatorPort, cacheServerPort)
|
||||
.withCreateContainerCmdModifier(
|
||||
(Consumer<CreateContainerCmd>) createContainerCmd -> createContainerCmd
|
||||
.withHostName("geode").withHostConfig(new HostConfig().withPortBindings(
|
||||
new PortBinding(Ports.Binding.bindPort(cacheServerPort),
|
||||
new ExposedPort(cacheServerPort)),
|
||||
new PortBinding(Ports.Binding.bindPort(locatorPort),
|
||||
new ExposedPort(locatorPort)))))
|
||||
.withCommand("tail", "-f", "/dev/null")
|
||||
.withStartupTimeout(Duration.ofMinutes(2));
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
//Not using locator is faster.
|
||||
System.out.println(geode.execGfsh(
|
||||
"start server --name=Server1 " + "--hostname-for-clients=geode" + " --server-port="
|
||||
+ cacheServerPort + " --J=-Dgemfire.jmx-manager=true --J=-Dgemfire.jmx-manager-start=true")
|
||||
.getStdout());
|
||||
System.out.println(geode.execGfsh("connect --jmx-manager=localhost[1099]",
|
||||
"create region --name=myRegion --type=REPLICATE").getStdout());
|
||||
ClientCache clientCache = new ClientCacheFactory().addPoolServer("localhost", cacheServerPort)
|
||||
.create();
|
||||
clientRegion = clientCache
|
||||
.createClientRegionFactory(ClientRegionShortcut.PROXY)
|
||||
.create("myRegion");
|
||||
}
|
||||
|
||||
@Container
|
||||
private DockerComposeContainer environment = new DockerComposeContainer(
|
||||
kafka(),
|
||||
resolveTemplate("source/geode-source-tests.yml", fluentMap()
|
||||
.withEntry("geode.host-addresses", "geode:" + cacheServerPort)
|
||||
.withEntry("extraHosts", "geode:" + localHostAddress())
|
||||
.withEntry("geode.region", "myRegion")))
|
||||
.withLogConsumer("log-sink", appLog("log-sink"))
|
||||
.withLogConsumer("geode-source", geodeLogMatcher)
|
||||
.withLogConsumer("log-sink", logMatcher)
|
||||
.withLocalCompose(true);
|
||||
|
||||
@Test
|
||||
void test() {
|
||||
LogMatcher.LogListener logListener = logMatcher.withRegex(LogMatcher.contains("world"));
|
||||
await().atMost(Duration.ofMinutes(2))
|
||||
.untilTrue(geodeLogMatcher.withRegex(LogMatcher.contains("Started GeodeSource")).matches());
|
||||
clientRegion.put("hello", "world");
|
||||
await().atMost(Duration.ofSeconds(30))
|
||||
.untilTrue(logListener.matches());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.DockerComposeContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests;
|
||||
import org.springframework.cloud.stream.apps.integration.test.LogMatcher;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.reactive.function.client.ClientResponse;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.LogMatcher.endsWith;
|
||||
|
||||
public class HttpSourceTests extends AbstractStreamApplicationTests {
|
||||
|
||||
private static int port = findAvailablePort();
|
||||
|
||||
private static LogMatcher logMatcher = new LogMatcher();
|
||||
|
||||
@Container
|
||||
private static final DockerComposeContainer environment = new DockerComposeContainer(
|
||||
kafka(),
|
||||
resolveTemplate("source/http-source-tests.yml", Collections.singletonMap("port", port)))
|
||||
.withLogConsumer("log-sink", appLog("log-sink"))
|
||||
.withLogConsumer("log-sink", logMatcher)
|
||||
.withExposedService("http-source", port,
|
||||
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
|
||||
|
||||
@Test
|
||||
void plaintext() {
|
||||
ClientResponse response = webClient()
|
||||
.post()
|
||||
.uri("http://localhost:" + port)
|
||||
.contentType(MediaType.TEXT_PLAIN)
|
||||
.body(Mono.just("Hello"), String.class)
|
||||
.exchange()
|
||||
.block();
|
||||
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
|
||||
|
||||
await().atMost(Duration.ofSeconds(30))
|
||||
.untilTrue(logMatcher.withRegex(endsWith("Hello")).matches());
|
||||
}
|
||||
|
||||
@Test
|
||||
void json() {
|
||||
ClientResponse response = webClient()
|
||||
.post()
|
||||
.uri("http://localhost:" + port)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.body(Mono.just("{\"Hello\":\"world\"}"), String.class)
|
||||
.exchange()
|
||||
.block();
|
||||
assertThat(response.statusCode().is2xxSuccessful()).isTrue();
|
||||
await().atMost(Duration.ofSeconds(30))
|
||||
.untilTrue(logMatcher.withRegex(".*\\{\"Hello\":\"world\"\\}").matches());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.DockerComposeContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests;
|
||||
import org.springframework.cloud.stream.apps.integration.test.LogMatcher;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.LogMatcher.contains;
|
||||
|
||||
public class JdbcSourceTests extends AbstractStreamApplicationTests {
|
||||
|
||||
private static LogMatcher logMatcher = new LogMatcher();
|
||||
|
||||
@Container
|
||||
private static final DockerComposeContainer environment = new DockerComposeContainer(
|
||||
kafka(),
|
||||
resolveTemplate("source/jdbc-source-tests.yml",
|
||||
Collections.singletonMap("init.sql", resourceAsFile("init.sql"))))
|
||||
.withLogConsumer("log-sink", logMatcher)
|
||||
.waitingFor("jdbc-source", Wait.forLogMessage(contains("Started JdbcSource"), 1)
|
||||
.withStartupTimeout(Duration.ofMinutes(2)));
|
||||
|
||||
@Test
|
||||
void test() {
|
||||
await().atMost(Duration.ofSeconds(30)).untilTrue(logMatcher.withRegex(contains("Bart Simpson")).matches());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.client.builder.AwsClientBuilder;
|
||||
import com.amazonaws.regions.Regions;
|
||||
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
|
||||
public class S3Sample {
|
||||
private static String bucketName = "testbucket";
|
||||
|
||||
private static String keyName = "hosts";
|
||||
|
||||
private static String uploadFileName = "/etc/hosts";
|
||||
|
||||
private static int minioPort = 33474;
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
AWSCredentials credentials = new BasicAWSCredentials("minio", "minio123");
|
||||
ClientConfiguration clientConfiguration = new ClientConfiguration();
|
||||
// clientConfiguration.setSignerOverride("AWSS3V4SignerType");
|
||||
AmazonS3 s3Client = AmazonS3ClientBuilder
|
||||
.standard()
|
||||
.withEndpointConfiguration(
|
||||
new AwsClientBuilder.EndpointConfiguration("http://localhost:" + minioPort,
|
||||
Regions.US_EAST_1.name()))
|
||||
.withPathStyleAccessEnabled(true)
|
||||
.withClientConfiguration(clientConfiguration)
|
||||
.withCredentials(new AWSStaticCredentialsProvider(credentials))
|
||||
.build();
|
||||
|
||||
try {
|
||||
s3Client.createBucket(bucketName);
|
||||
System.out.println("Uploading a new object to S3 from a file\n");
|
||||
File file = new ClassPathResource("minio/data").getFile();
|
||||
// Upload file
|
||||
s3Client.putObject(new PutObjectRequest(bucketName, keyName, file));
|
||||
|
||||
// Download file
|
||||
GetObjectRequest rangeObjectRequest = new GetObjectRequest(bucketName, keyName);
|
||||
S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
|
||||
System.out.println("Printing bytes retrieved:");
|
||||
displayTextInputStream(objectPortion.getObjectContent());
|
||||
}
|
||||
catch (AmazonServiceException ase) {
|
||||
System.out.println("Caught an AmazonServiceException, which " + "means your request made it "
|
||||
+ "to Amazon S3, but was rejected with an error response" + " for some reason.");
|
||||
System.out.println("Error Message: " + ase.getMessage());
|
||||
System.out.println("HTTP Status Code: " + ase.getStatusCode());
|
||||
System.out.println("AWS Error Code: " + ase.getErrorCode());
|
||||
System.out.println("Error Type: " + ase.getErrorType());
|
||||
System.out.println("Request ID: " + ase.getRequestId());
|
||||
|
||||
}
|
||||
catch (AmazonClientException ace) {
|
||||
System.out.println("Caught an AmazonClientException, which " + "means the client encountered "
|
||||
+ "an internal error while trying to "
|
||||
+ "communicate with S3, " + "such as not being able to access the network.");
|
||||
System.out.println("Error Message: " + ace.getMessage());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static void displayTextInputStream(InputStream input) throws IOException {
|
||||
// Read one text line at a time and display.
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
|
||||
while (true) {
|
||||
String line = reader.readLine();
|
||||
if (line == null) {
|
||||
break;
|
||||
}
|
||||
System.out.println(" " + line);
|
||||
}
|
||||
System.out.println();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.apps.integration.test.source;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.client.builder.AwsClientBuilder;
|
||||
import com.amazonaws.regions.Regions;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.github.dockerjava.api.command.CreateContainerCmd;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.DockerComposeContainer;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
|
||||
import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests;
|
||||
import org.springframework.cloud.stream.apps.integration.test.LogMatcher;
|
||||
import org.springframework.cloud.stream.apps.integration.test.LogMatcher.LogListener;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap;
|
||||
import static org.springframework.cloud.stream.apps.integration.test.LogMatcher.contains;
|
||||
|
||||
public class S3SourceTests extends AbstractStreamApplicationTests {
|
||||
|
||||
private static AmazonS3 s3Client;
|
||||
|
||||
private static LogMatcher logMatcher = new LogMatcher();
|
||||
|
||||
@Container
|
||||
private static final GenericContainer minio = new GenericContainer("minio/minio:RELEASE.2020-09-05T07-14-49Z")
|
||||
.withExposedPorts(9000)
|
||||
.withEnv("MINIO_ACCESS_KEY", "minio")
|
||||
.withEnv("MINIO_SECRET_KEY", "minio123")
|
||||
.waitingFor(Wait.forHttp("/minio/health/live"))
|
||||
.withCreateContainerCmdModifier(
|
||||
(Consumer<CreateContainerCmd>) createContainerCmd -> createContainerCmd.withHostName("minio"))
|
||||
.withLogConsumer(appLog("minio"))
|
||||
.withCommand("minio", "server", "/data");
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
AWSCredentials credentials = new BasicAWSCredentials("minio", "minio123");
|
||||
ClientConfiguration clientConfiguration = new ClientConfiguration();
|
||||
s3Client = AmazonS3ClientBuilder
|
||||
.standard()
|
||||
.withEndpointConfiguration(
|
||||
new AwsClientBuilder.EndpointConfiguration("http://localhost:" + minio.getMappedPort(9000),
|
||||
Regions.US_EAST_1.name()))
|
||||
.withPathStyleAccessEnabled(true)
|
||||
.withClientConfiguration(clientConfiguration)
|
||||
.withCredentials(new AWSStaticCredentialsProvider(credentials))
|
||||
.build();
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Container
|
||||
private final DockerComposeContainer environment = new DockerComposeContainer(
|
||||
kafka(),
|
||||
resolveTemplate("source/s3-source-tests.yml",
|
||||
fluentMap().withEntry("s3.local.dir", resourceAsFile("minio"))
|
||||
.withEntry("s3.endpoint.url",
|
||||
"http://minio:" + minio.getMappedPort(9000))
|
||||
.withEntry("extraHosts", "minio:" + localHostAddress())))
|
||||
.withLogConsumer("log-sink", logMatcher)
|
||||
.withLogConsumer("s3-source", logMatcher)
|
||||
.withLogConsumer("log-sink", appLog("logSink"));
|
||||
|
||||
@Test
|
||||
void test() {
|
||||
|
||||
await().atMost(Duration.ofMinutes(2)).untilTrue(logMatcher.withRegex(contains("Started S3Source")).matches());
|
||||
LogListener logListener = logMatcher.withRegex(contains("Bart Simpson"));
|
||||
s3Client.createBucket("bucket");
|
||||
s3Client.putObject(new PutObjectRequest("bucket", "test", resourceAsFile("minio/data")));
|
||||
await().atMost(Duration.ofSeconds(30)).untilTrue(logListener.matches());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
version: '2.4'
|
||||
services:
|
||||
kafka-broker:
|
||||
image: confluentinc/cp-kafka:5.5.1
|
||||
expose:
|
||||
- "9092"
|
||||
environment:
|
||||
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092
|
||||
- KAFKA_ADVERTISED_HOST_NAME=kafka-broker
|
||||
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
|
||||
- KAFKA_MESSAGE_MAX_BYTES=2097152
|
||||
depends_on:
|
||||
- zookeeper
|
||||
|
||||
zookeeper:
|
||||
image: confluentinc/cp-zookeeper:5.5.1
|
||||
expose:
|
||||
- "2181"
|
||||
environment:
|
||||
- ZOOKEEPER_CLIENT_PORT=2181
|
||||
@@ -0,0 +1,10 @@
|
||||
CREATE DATABASE IF NOT EXISTS test;
|
||||
USE test;
|
||||
CREATE TABLE IF NOT EXISTS People (
|
||||
id INT NOT NULL AUTO_INCREMENT,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
street VARCHAR(255) NOT NULL,
|
||||
city VARCHAR(255) NOT NULL,
|
||||
deleted CHAR(1) DEFAULT 'N',
|
||||
PRIMARY KEY (id));
|
||||
INSERT INTO People (name, street, city, deleted) VALUES ("Bart Simpson", "Main Street", "Springfield","N");
|
||||
@@ -0,0 +1,15 @@
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="STDOUT"/>
|
||||
</root>
|
||||
|
||||
<logger name="org.testcontainers" level="INFO"/>
|
||||
<logger name="com.github.dockerjava" level="WARN"/>
|
||||
<logger name="org.springframework.cloud.stream.apps.integration.test" level="DEBUG"/>
|
||||
</configuration>
|
||||
1
stream-applications-integration-tests/src/test/resources/minio/data
Executable file
1
stream-applications-integration-tests/src/test/resources/minio/data
Executable file
@@ -0,0 +1 @@
|
||||
Bart Simpson
|
||||
@@ -0,0 +1,33 @@
|
||||
version: '2'
|
||||
services:
|
||||
|
||||
http-source:
|
||||
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
ports:
|
||||
- "{{port}}:{{port}}"
|
||||
environment:
|
||||
- SERVER_PORT={{port}}
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=processor
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
http-request-processor:
|
||||
image: springcloudstream/http-request-processor-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- HTTP_REQUEST_URL_EXPRESSION='{{url}}'
|
||||
- HTTP_REQUEST_HTTP_METHOD_EXPRESSION='POST'
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=processor
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-request-processor
|
||||
log-sink:
|
||||
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-request-processor
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
version: '2.4'
|
||||
services:
|
||||
|
||||
http-source:
|
||||
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
ports:
|
||||
- "{{port}}:{{port}}"
|
||||
environment:
|
||||
- SERVER_PORT={{port}}
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=jdbc
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
jdbc-sink:
|
||||
image: springcloudstream/jdbc-sink-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- JDBC_CONSUMER_COLUMNS=name,city:address.city,street:address.street
|
||||
- JDBC_CONSUMER_TABLE_NAME=People
|
||||
- SPRING_DATASOURCE_PASSWORD={{password}}
|
||||
- SPRING_DATASOURCE_USERNAME={{user}}
|
||||
- SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
|
||||
- SPRING_DATASOURCE_URL={{jdbc.url}}
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=jdbc
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=jdbc-sink
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
version: '2.4'
|
||||
services:
|
||||
http-source:
|
||||
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
ports:
|
||||
- "{{port}}:{{port}}"
|
||||
environment:
|
||||
- SERVER_PORT={{port}}
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=mongodb
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
mongodb-sink:
|
||||
image: springcloudstream/mongodb-sink-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- MONGO_DB_CONSUMER_COLLECTION=test
|
||||
- SPRING_DATA_MONGODB_URL={{mongodb.url}}
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=mongodb
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=mongodb-sink
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
version: '2.4'
|
||||
services:
|
||||
|
||||
http-source:
|
||||
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
ports:
|
||||
- "{{port}}:{{port}}"
|
||||
environment:
|
||||
- SERVER_PORT={{port}}
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=tcp
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
tcp-sink:
|
||||
image: springcloudstream/tcp-sink-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- TCP_CONSUMER_HOST={{tcp.host}}
|
||||
- TCP_PORT={{tcp.port}}
|
||||
- TCP_CONSUMER_ENCODER=CRLF
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=tcp
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=tcp-sink
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
version: '2.4'
|
||||
services:
|
||||
|
||||
geode-source:
|
||||
image: springcloudstream/geode-source-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- GEODE_POOL_CONNECT_TYPE=server
|
||||
- GEODE_REGION_REGION_NAME={{geode.region}}
|
||||
- GEODE_POOL_HOST_ADDRESSES={{geode.host-addresses}}
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
extra_hosts:
|
||||
- {{extraHosts}}
|
||||
log-sink:
|
||||
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=geode
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
version: '2.4'
|
||||
services:
|
||||
|
||||
http-source:
|
||||
image: springcloudstream/http-source-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
ports:
|
||||
- "{{port}}:{{port}}"
|
||||
environment:
|
||||
- SERVER_PORT={{port}}
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
log-sink:
|
||||
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
version: '2.4'
|
||||
services:
|
||||
mysql:
|
||||
image: mysql:5.7
|
||||
command: --init-file /init.sql
|
||||
volumes:
|
||||
- {{init.sql}}:/init.sql
|
||||
environment:
|
||||
MYSQL_USER: root
|
||||
MYSQL_ROOT_PASSWORD: secret
|
||||
expose:
|
||||
- 3306
|
||||
|
||||
jdbc-source:
|
||||
image: springcloudstream/jdbc-source-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
- mysql
|
||||
environment:
|
||||
- JDBC_SUPPLIER_QUERY=SELECT * FROM People WHERE deleted='N'
|
||||
- JDBC_SUPPLIER_UPDATE=UPDATE People SET deleted='Y' WHERE id=:id
|
||||
- SPRING_DATASOURCE_PASSWORD=secret
|
||||
- SPRING_DATASOURCE_USERNAME=root
|
||||
- SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
|
||||
- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/test
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
log-sink:
|
||||
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=jdbc
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
version: '2.4'
|
||||
services:
|
||||
s3-source:
|
||||
image: springcloudstream/s3-source-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- FILE_CONSUMER_MODE=lines
|
||||
- S3_COMMON_ENDPOINT_URL={{s3.endpoint.url}}
|
||||
- S3_COMMON_PATH_STYLE_ACCESS=true
|
||||
- S3_SUPPLIER_REMOTE_DIR=bucket
|
||||
- CLOUD_AWS_STACK_AUTO=false
|
||||
- CLOUD_AWS_CREDENTIALS_ACCESS_KEY=minio
|
||||
- CLOUD_AWS_CREDENTIALS_SECRET_KEY=minio123
|
||||
- CLOUD_AWS_REGION_STATIC=us-east-1
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
extra_hosts:
|
||||
- {{extraHosts}}
|
||||
log-sink:
|
||||
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http
|
||||
@@ -0,0 +1,17 @@
|
||||
version: '2.4'
|
||||
services:
|
||||
time-source:
|
||||
image: springcloudstream/time-source-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
log-sink:
|
||||
image: springcloudstream/log-sink-kafka:{{stream.apps.version}}
|
||||
depends_on:
|
||||
- kafka-broker
|
||||
environment:
|
||||
- SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log
|
||||
- SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=ticktock
|
||||
Reference in New Issue
Block a user