diff --git a/.gitignore b/.gitignore index 214f0bc..c9a06a0 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ dump.rdb .apt_generated artifacts **/dependency-reduced-pom.xml +.idea diff --git a/stream-applications-integration-tests/.mvn/wrapper/MavenWrapperDownloader.java b/stream-applications-integration-tests/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100644 index 0000000..0b1fd00 --- /dev/null +++ b/stream-applications-integration-tests/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,118 @@ +/* + * Copyright 2007-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + private static final String WRAPPER_VERSION = "0.5.6"; + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if (mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if (mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if (!outputFile.getParentFile().exists()) { + if (!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { + String username = System.getenv("MVNW_USERNAME"); + char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password); + } + }); + } + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} diff --git a/stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.jar b/stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 0000000..2cc7d4a Binary files /dev/null and b/stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.jar differ diff --git a/stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.properties b/stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..642d572 --- /dev/null +++ b/stream-applications-integration-tests/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,2 @@ +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar diff --git a/stream-applications-integration-tests/README.adoc b/stream-applications-integration-tests/README.adoc new file mode 100644 index 0000000..5d72118 --- /dev/null +++ b/stream-applications-integration-tests/README.adoc @@ -0,0 +1,43 @@ += Stream Applications Integration Tests + +This contains integration tests for pre-packaged stream-applications for Docker using https://www.testcontainers.org/[TestContainers]. +These are end-to-end integration tests running apps and required resources, using docker-compose. +The goal is to have an end-to-end integration test for each pre-packaged application. +We don't aim to test different configuration options, as this is the responsibility of the stream applications. +One of the major benefits is to verify the built Docker images run correctly, especially when we introduce global changes, +such as upgrading the base JDK image, or other pervasive changes. + +== Test Strategy + +Each test uses a docker-compose file to configure a simple pipeline including the source, sink, or processor app under test. +Usually, a single message is sufficient. + +The tests use following patterns: + +=== Source +To test a source, we use a log sink and add a LogConsumer that returns a boolean value, eventually evaluating to `true` +when an expected regex match is detected. Then we create an event to trigger the source. + +=== Sink +To test a sink, we use the http source and post a message using WebClient. + +=== Processor +To test a processor, use an http source and a log sink. + +Typically, it is convenient to run the required external resource in a separate container, since it must be accessed by localhost(to provide data to a source or +verify data created by a sink), and by the containerized applications. + +=== Organization and Conventions +`processor`, `source`, and `sink` are leaf packages. +The docker-compose YAML corresponding each test are test resources, similarly organized. +So `.../source/JdbcSinkTests` corresponds to `source/jdbc-source-tests.yml`. + +=== Templating +The docker-compose files are Mustache templates to allow configuration of runtime values, such as ports. +The Docker tag is required configuration, so the templates must always be processed. +The result is copied to a temporary file. + + + + + diff --git a/stream-applications-integration-tests/mvnw b/stream-applications-integration-tests/mvnw new file mode 100755 index 0000000..a16b543 --- /dev/null +++ b/stream-applications-integration-tests/mvnw @@ -0,0 +1,310 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/stream-applications-integration-tests/mvnw.cmd b/stream-applications-integration-tests/mvnw.cmd new file mode 100644 index 0000000..c8d4337 --- /dev/null +++ b/stream-applications-integration-tests/mvnw.cmd @@ -0,0 +1,182 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM https://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + +FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %DOWNLOAD_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/stream-applications-integration-tests/pom.xml b/stream-applications-integration-tests/pom.xml new file mode 100644 index 0000000..b31e48f --- /dev/null +++ b/stream-applications-integration-tests/pom.xml @@ -0,0 +1,216 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.3.3.RELEASE + + + org.springframework.cloud.stream.apps + demo + 1.0.0-SNAPSHOT + stream-applications-integration-tests + Integration Tests for stream applications + + + 1.8 + 1.0.0-SNAPSHOT + 2.6.2 + Hoxton.SR8 + 1.14.3 + 1.15 + 3.1.0 + false + true + true + true + 8.29 + https://raw.githubusercontent.com/spring-cloud/stream-applications/master/etc/checkstyle + + + ${checkstyle.location}/checkstyle-suppressions.xml + + + ${checkstyle.location}/nohttp-checkstyle.xml + + ${checkstyle.location}/checkstyle-suppressions.xml + + 0.0.2.RELEASE + true + 0.0.7 + 2.27.1 + 1.11.415 + + + + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + mariadb + test + + + org.testcontainers + mongodb + test + + + org.springframework.cloud.fn + function-test-support + ${java-functions.version} + test + + + org.springframework.data + spring-data-geode + test + + + com.amazonaws + aws-java-sdk-s3 + ${aws.version} + test + + + com.squareup.okhttp3 + mockwebserver + test + + + org.springframework.boot + spring-boot-starter-data-mongodb + + + org.mariadb.jdbc + mariadb-java-client + ${mariadb-client.version} + test + + + org.springframework.boot + spring-boot-starter-jdbc + test + + + com.samskivert + jmustache + ${jmustache.version} + test + + + org.awaitility + awaitility + test + + + junit + junit + + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + ${maven-checkstyle-plugin.version} + + + com.puppycrawl.tools + checkstyle + ${puppycrawl-tools-checkstyle.version} + + + io.spring.javaformat + spring-javaformat-checkstyle + ${spring-javaformat-checkstyle.version} + + + io.spring.nohttp + nohttp-checkstyle + ${nohttp-checkstyle.version} + + + + + checkstyle-validation + validate + true + + ${disable.checks} + ${checkstyle.location}/checkstyle.xml + ${checkstyle.location}/checkstyle-header.txt + + checkstyle.build.directory=${project.build.directory} + checkstyle.suppressions.file=${checkstyle.suppressions.file} + checkstyle.additional.suppressions.file=${checkstyle.additional.suppressions.file} + + true + + ${maven-checkstyle-plugin.includeTestSourceDirectory} + + ${maven-checkstyle-plugin.failsOnError} + + + ${maven-checkstyle-plugin.failOnViolation} + + + + check + + + + no-http-checkstyle-validation + validate + true + + ${disable.nohttp.checks} + ${checkstyle.nohttp.file} + **/* + **/.idea/**/*,**/.git/**/*,**/target/**/*,**/*.log + ./ + + + check + + + + + + + + + + + org.testcontainers + testcontainers-bom + ${testcontainers.version} + pom + import + + + + + diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/AbstractStreamApplicationTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/AbstractStreamApplicationTests.java new file mode 100644 index 0000000..e0e42fe --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/AbstractStreamApplicationTests.java @@ -0,0 +1,130 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test; + +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +import com.samskivert.mustache.Mustache; +import com.samskivert.mustache.Template; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.core.io.ClassPathResource; +import org.springframework.util.SocketUtils; +import org.springframework.web.reactive.function.client.WebClient; + +@Testcontainers +public abstract class AbstractStreamApplicationTests { + + protected final static String STREAM_APPS_VERSION = "3.0.0-SNAPSHOT"; + + public static final String STREAM_APPS_VERSION_KEY = "stream.apps.version"; + + protected static Path tempDir; + + protected static File kafka() { + return resourceAsFile("compose-kafka.yml"); + } + + protected static File resourceAsFile(String path) { + try { + return new ClassPathResource(path).getFile(); + } + catch (IOException e) { + throw new IllegalStateException("Unable to access resource " + path); + } + } + + protected static String localHostAddress() { + try { + return InetAddress.getLocalHost().getHostAddress(); + } + catch (UnknownHostException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + private static WebClient webClient = WebClient.builder().build(); + + protected static WebClient webClient() { + return webClient; + } + + // Junit TempDir does not work with DockerComposeContainer unless you mount it. + // Also doesn't work as @BeforeAll in this case. + static void initializeTempDir() throws IOException { + Path tempRoot = Paths.get(new ClassPathResource("/").getFile().getAbsolutePath()); + if (tempDir == null) { + tempDir = Files.createTempDirectory(tempRoot, UUID.randomUUID().toString()); + tempDir.toFile().deleteOnExit(); + } + } + + protected static int findAvailablePort() { + return SocketUtils.findAvailableTcpPort(10000, 20000); + } + + protected static File resolveTemplate(String templatePath, Map templateProperties) { + try { + initializeTempDir(); + try (InputStreamReader resourcesTemplateReader = new InputStreamReader( + Objects.requireNonNull(new ClassPathResource(templatePath).getInputStream()))) { + Template resourceTemplate = Mustache.compiler().escapeHTML(false).compile(resourcesTemplateReader); + Path temporaryFile = Files.createFile(tempDir.resolve(Paths.get(templatePath).getFileName())); + Files.write(temporaryFile, + resourceTemplate.execute(addGlobalProperties(templateProperties)).getBytes()).toFile(); + temporaryFile.toFile().deleteOnExit(); + return temporaryFile.toFile(); + } + } + catch (IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + private static Map addGlobalProperties(Map templateProperties) { + if (templateProperties.containsKey(STREAM_APPS_VERSION)) { + return templateProperties; + } + Map enriched = new HashMap<>(templateProperties); + enriched.put(STREAM_APPS_VERSION_KEY, STREAM_APPS_VERSION); + return enriched; + } + + public static class AppLog extends Slf4jLogConsumer { + public static AppLog appLog(String appName) { + return new AppLog(appName); + } + + AppLog(String appName) { + super(LoggerFactory.getLogger(appName)); + } + } + +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/FluentMap.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/FluentMap.java new file mode 100644 index 0000000..7df6fcd --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/FluentMap.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test; + +import java.util.LinkedHashMap; + +public class FluentMap extends LinkedHashMap { + public static FluentMap fluentMap() { + return new FluentMap<>(); + } + + public FluentMap withEntry(K key, V value) { + put(key, value); + return this; + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/LogMatcher.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/LogMatcher.java new file mode 100644 index 0000000..d46adda --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/LogMatcher.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.OutputFrame; + +public class LogMatcher implements Consumer { + private static Logger logger = LoggerFactory.getLogger(LogMatcher.class); + + private List> listeners = new LinkedList<>(); + + public static String contains(String string) { + return ".*" + string + ".*"; + } + + public static String endsWith(String string) { + return ".*" + string; + } + + @Override + public void accept(OutputFrame outputFrame) { + listeners.forEach(m -> m.accept(outputFrame.getUtf8String())); + } + + public LogListener withRegex(String regex) { + LogListener logListener = new LogListener(regex); + listeners.add(logListener); + return logListener; + } + + public class LogListener implements Consumer { + private AtomicBoolean matched = new AtomicBoolean(); + + private final Pattern pattern; + + LogListener(String regex) { + pattern = Pattern.compile(regex); + } + + @Override + public void accept(String s) { + logger.trace(this + "matching " + s.trim() + " using pattern " + pattern.pattern()); + if (pattern.matcher(s.trim()).matches()) { + logger.debug(" MATCHED " + s.trim()); + matched.set(true); + } + } + + public AtomicBoolean matches() { + return matched; + } + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TickTockTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TickTockTests.java new file mode 100644 index 0000000..785075d --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/TickTockTests.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test; + +import java.time.Duration; +import java.util.Collections; +import java.util.regex.Pattern; + +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; + +import static org.assertj.core.api.Assertions.assertThatCode; + +public class TickTockTests extends AbstractStreamApplicationTests { + // "MM/dd/yy HH:mm:ss"; + private final Pattern pattern = Pattern.compile(".*\\d{2}/\\d{2}/\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}"); + + @Container + private final DockerComposeContainer environment = new DockerComposeContainer( + kafka(), + resolveTemplate("tick-tock-tests.yml", Collections.EMPTY_MAP)); + + @Test + void ticktock() { + assertThatCode(() -> environment.waitingFor("log-sink", Wait.forLogMessage(pattern.pattern(), 5) + .withStartupTimeout(Duration.ofMinutes(2)))).doesNotThrowAnyException(); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/HttpRequestProcessorTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/HttpRequestProcessorTests.java new file mode 100644 index 0000000..a950cec --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/processor/HttpRequestProcessorTests.java @@ -0,0 +1,94 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.processor; + +import java.net.InetAddress; +import java.time.Duration; + +import okhttp3.mockwebserver.Dispatcher; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import reactor.core.publisher.Mono; + +import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests; +import org.springframework.cloud.stream.apps.integration.test.LogMatcher; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.ClientResponse; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog; +import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap; + +public class HttpRequestProcessorTests extends AbstractStreamApplicationTests { + private static MockWebServer server = new MockWebServer(); + + private static int serverPort = findAvailablePort(); + + private static String url = "http://" + localHostAddress() + ":" + serverPort; + + private static int sourcePort = findAvailablePort(); + + private static LogMatcher logMatcher = new LogMatcher(); + + @Container + private static final DockerComposeContainer environment = new DockerComposeContainer( + kafka(), + resolveTemplate("processor/http-request-processor-tests.yml", fluentMap() + .withEntry("port", sourcePort) + .withEntry("url", url))) + .withLogConsumer("log-sink", appLog("log-sink")) + .withLogConsumer("log-sink", logMatcher) + .withExposedService("http-source", sourcePort, + Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + + @BeforeAll + static void startServer() throws Exception { + server.start(InetAddress.getLocalHost(), serverPort); + } + + @Test + void get() { + server.setDispatcher(new Dispatcher() { + @Override + public MockResponse dispatch(RecordedRequest recordedRequest) { + return new MockResponse().setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + .setBody("{\"response\":\"" + recordedRequest.getBody().readUtf8() + "\"}") + .setResponseCode(HttpStatus.OK.value()); + } + }); + ClientResponse response = webClient() + .post() + .uri("http://localhost:" + sourcePort) + .contentType(MediaType.TEXT_PLAIN) + .body(Mono.just("ping"), String.class) + .exchange() + .block(); + assertThat(response.statusCode().is2xxSuccessful()).isTrue(); + + await().atMost(Duration.ofSeconds(30)) + .untilTrue(logMatcher.withRegex(".*\\{\"response\":\"ping\"\\}").matches()); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/JdbcSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/JdbcSinkTests.java new file mode 100644 index 0000000..6a3728c --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/JdbcSinkTests.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.sink; + +import java.time.Duration; + +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.MariaDBContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import reactor.core.publisher.Mono; + +import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests; +import org.springframework.http.MediaType; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.ClientResponse; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog; +import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap; + +public class JdbcSinkTests extends AbstractStreamApplicationTests { + + private static int port = findAvailablePort(); + + private static JdbcTemplate jdbcTemplate; + + @Container + private static MariaDBContainer mariadbContainer = (MariaDBContainer) new MariaDBContainer() + .withDatabaseName("test") + .withPassword("password") + .withUsername("user") + .withInitScript("init.sql") + .withExposedPorts(3306) + .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + + @Container + private DockerComposeContainer environment = new DockerComposeContainer( + kafka(), + resolveTemplate("sink/jdbc-sink-tests.yml", fluentMap() + .withEntry("jdbc.url", + mariadbContainer.getJdbcUrl().replace("localhost", + localHostAddress())) + .withEntry("user", mariadbContainer.getUsername()) + .withEntry("password", mariadbContainer.getPassword()) + .withEntry("port", port))) + .withLogConsumer("jdbc-sink", appLog("jdbc-sink")) + .withExposedService("http-source", port, + Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + + @BeforeAll + static void buildJdbcTemplate() { + HikariDataSource dataSource = new HikariDataSource(); + dataSource.setDriverClassName(mariadbContainer.getDriverClassName()); + dataSource.setUsername(mariadbContainer.getUsername()); + dataSource.setPassword(mariadbContainer.getPassword()); + dataSource.setJdbcUrl(mariadbContainer.getJdbcUrl()); + jdbcTemplate = new JdbcTemplate(dataSource); + jdbcTemplate.execute("DELETE FROM People"); + } + + @Test + void postData() { + String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\": \"Narrow Alley\"}}"; + ClientResponse response = webClient() + .post() + .uri("http://localhost:" + port) + .contentType(MediaType.APPLICATION_JSON) + .body(Mono.just(json), String.class) + .exchange() + .block(); + assertThat(response.statusCode().is2xxSuccessful()).isTrue(); + + await().atMost(Duration.ofSeconds(30)) + .untilAsserted( + () -> assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) from People", Integer.class)) + .isOne()); + assertThat(jdbcTemplate.queryForObject("SELECT name from People", String.class)).isEqualTo("My Name"); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/MongoDBSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/MongoDBSinkTests.java new file mode 100644 index 0000000..f9c8263 --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/MongoDBSinkTests.java @@ -0,0 +1,89 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.sink; + +import java.time.Duration; +import java.util.List; + +import org.bson.Document; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import reactor.core.publisher.Mono; + +import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests; +import org.springframework.data.mongodb.MongoDatabaseFactory; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.ClientResponse; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog; +import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap; + +public class MongoDBSinkTests extends AbstractStreamApplicationTests { + + private static int port = findAvailablePort(); + + private static MongoTemplate mongoTemplate; + + @Container + private static MongoDBContainer mongoDBContainer = new MongoDBContainer() + .withExposedPorts(27017) + .withStartupTimeout(Duration.ofMinutes(2)); + + private static String mongoConnectionString() { + return String.format("mongodb://%s:%s/%s", localHostAddress(), mongoDBContainer.getMappedPort(27017), "test"); + } + + @BeforeAll + private static void buildMongoTemplate() { + MongoDatabaseFactory mongoDatabaseFactory = new SimpleMongoClientDatabaseFactory( + mongoConnectionString()); + mongoTemplate = new MongoTemplate(mongoDatabaseFactory); + } + + @Container + private DockerComposeContainer environment = new DockerComposeContainer( + kafka(), + resolveTemplate("sink/mongodb-sink-tests.yml", fluentMap() + .withEntry("mongodb.url", mongoConnectionString()) + .withEntry("port", port))) + .withLogConsumer("jdbc-sink", + appLog("jdbc-sink")) + .withExposedService("http-source", port, + Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + + @Test + void postData() { + String json = "{\"name\":\"My Name\",\"address\":{ \"city\": \"Big City\", \"street\": \"Narrow Alley\"}}"; + ClientResponse response = webClient() + .post() + .uri("http://localhost:" + port) + .contentType(MediaType.APPLICATION_JSON) + .body(Mono.just(json), String.class) + .exchange() + .block(Duration.ofSeconds(30)); + assertThat(response.statusCode().is2xxSuccessful()).isTrue(); + List docs = mongoTemplate.findAll(Document.class, "test"); + assertThat(docs).allMatch(document -> document.get("name", String.class).equals("My Name")); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/TcpSinkTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/TcpSinkTests.java new file mode 100644 index 0000000..47e25e8 --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/sink/TcpSinkTests.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.sink; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import reactor.core.publisher.Mono; + +import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.ClientResponse; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog; +import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap; + +public class TcpSinkTests extends AbstractStreamApplicationTests { + + private static final int port = findAvailablePort(); + + private static final int tcpPort = findAvailablePort(); + + private static Socket socket; + + private static final AtomicBoolean socketReady = new AtomicBoolean(); + + @Container + private static final DockerComposeContainer environment = new DockerComposeContainer( + kafka(), + resolveTemplate("sink/tcp-sink-tests.yml", fluentMap() + .withEntry("port", port) + .withEntry("tcp.port", tcpPort) + .withEntry("tcp.host", localHostAddress()))) + .withLogConsumer("tcp-sink", appLog("tcp-sink")) + .withExposedService("http-source", port, + Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + + @BeforeAll + static void startTcpServer() { + new Thread(() -> { + try { + socket = new ServerSocket(tcpPort, 50, InetAddress.getLocalHost()).accept(); + socketReady.set(true); + } + catch (IOException exception) { + exception.printStackTrace(); + } + }).start(); + } + + @Test + void postData() throws IOException { + String text = "Hello, world!"; + ClientResponse response = webClient() + .post() + .uri("http://localhost:" + port) + .contentType(MediaType.TEXT_PLAIN) + .body(Mono.just(text), String.class) + .exchange() + .block(); + assertThat(response.statusCode().is2xxSuccessful()).isTrue(); + await().atMost(Duration.ofSeconds(10)).untilTrue(socketReady); + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + await().atMost(Duration.ofSeconds(10)).until(() -> reader.readLine().equals("Hello, world!")); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/GeodeSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/GeodeSourceTests.java new file mode 100644 index 0000000..84976c1 --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/GeodeSourceTests.java @@ -0,0 +1,110 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.source; + +import java.time.Duration; +import java.util.function.Consumer; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.HostConfig; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.images.builder.ImageFromDockerfile; +import org.testcontainers.junit.jupiter.Container; + +import org.springframework.cloud.fn.test.support.geode.GeodeContainer; +import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests; +import org.springframework.cloud.stream.apps.integration.test.LogMatcher; + +import static org.awaitility.Awaitility.await; +import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog; +import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap; + +public class GeodeSourceTests extends AbstractStreamApplicationTests { + + private static LogMatcher logMatcher = new LogMatcher(); + + private static LogMatcher geodeLogMatcher = new LogMatcher(); + + private static int locatorPort = findAvailablePort(); + + private static int cacheServerPort = findAvailablePort(); + + private static Region clientRegion; + + @Container + private static GeodeContainer geode = (GeodeContainer) new GeodeContainer(new ImageFromDockerfile() + .withFileFromClasspath("Dockerfile", "geode/Dockerfile") + .withBuildArg("CACHE_SERVER_PORT", String.valueOf(cacheServerPort)) + .withBuildArg("LOCATOR_PORT", String.valueOf(locatorPort)), + locatorPort, cacheServerPort) + .withCreateContainerCmdModifier( + (Consumer) createContainerCmd -> createContainerCmd + .withHostName("geode").withHostConfig(new HostConfig().withPortBindings( + new PortBinding(Ports.Binding.bindPort(cacheServerPort), + new ExposedPort(cacheServerPort)), + new PortBinding(Ports.Binding.bindPort(locatorPort), + new ExposedPort(locatorPort))))) + .withCommand("tail", "-f", "/dev/null") + .withStartupTimeout(Duration.ofMinutes(2)); + + @BeforeAll + static void init() { + //Not using locator is faster. + System.out.println(geode.execGfsh( + "start server --name=Server1 " + "--hostname-for-clients=geode" + " --server-port=" + + cacheServerPort + " --J=-Dgemfire.jmx-manager=true --J=-Dgemfire.jmx-manager-start=true") + .getStdout()); + System.out.println(geode.execGfsh("connect --jmx-manager=localhost[1099]", + "create region --name=myRegion --type=REPLICATE").getStdout()); + ClientCache clientCache = new ClientCacheFactory().addPoolServer("localhost", cacheServerPort) + .create(); + clientRegion = clientCache + .createClientRegionFactory(ClientRegionShortcut.PROXY) + .create("myRegion"); + } + + @Container + private DockerComposeContainer environment = new DockerComposeContainer( + kafka(), + resolveTemplate("source/geode-source-tests.yml", fluentMap() + .withEntry("geode.host-addresses", "geode:" + cacheServerPort) + .withEntry("extraHosts", "geode:" + localHostAddress()) + .withEntry("geode.region", "myRegion"))) + .withLogConsumer("log-sink", appLog("log-sink")) + .withLogConsumer("geode-source", geodeLogMatcher) + .withLogConsumer("log-sink", logMatcher) + .withLocalCompose(true); + + @Test + void test() { + LogMatcher.LogListener logListener = logMatcher.withRegex(LogMatcher.contains("world")); + await().atMost(Duration.ofMinutes(2)) + .untilTrue(geodeLogMatcher.withRegex(LogMatcher.contains("Started GeodeSource")).matches()); + clientRegion.put("hello", "world"); + await().atMost(Duration.ofSeconds(30)) + .untilTrue(logListener.matches()); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/HttpSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/HttpSourceTests.java new file mode 100644 index 0000000..8cb4eee --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/HttpSourceTests.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.source; + +import java.time.Duration; +import java.util.Collections; + +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import reactor.core.publisher.Mono; + +import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests; +import org.springframework.cloud.stream.apps.integration.test.LogMatcher; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.ClientResponse; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog; +import static org.springframework.cloud.stream.apps.integration.test.LogMatcher.endsWith; + +public class HttpSourceTests extends AbstractStreamApplicationTests { + + private static int port = findAvailablePort(); + + private static LogMatcher logMatcher = new LogMatcher(); + + @Container + private static final DockerComposeContainer environment = new DockerComposeContainer( + kafka(), + resolveTemplate("source/http-source-tests.yml", Collections.singletonMap("port", port))) + .withLogConsumer("log-sink", appLog("log-sink")) + .withLogConsumer("log-sink", logMatcher) + .withExposedService("http-source", port, + Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + + @Test + void plaintext() { + ClientResponse response = webClient() + .post() + .uri("http://localhost:" + port) + .contentType(MediaType.TEXT_PLAIN) + .body(Mono.just("Hello"), String.class) + .exchange() + .block(); + assertThat(response.statusCode().is2xxSuccessful()).isTrue(); + + await().atMost(Duration.ofSeconds(30)) + .untilTrue(logMatcher.withRegex(endsWith("Hello")).matches()); + } + + @Test + void json() { + ClientResponse response = webClient() + .post() + .uri("http://localhost:" + port) + .contentType(MediaType.APPLICATION_JSON) + .body(Mono.just("{\"Hello\":\"world\"}"), String.class) + .exchange() + .block(); + assertThat(response.statusCode().is2xxSuccessful()).isTrue(); + await().atMost(Duration.ofSeconds(30)) + .untilTrue(logMatcher.withRegex(".*\\{\"Hello\":\"world\"\\}").matches()); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/JdbcSourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/JdbcSourceTests.java new file mode 100644 index 0000000..97e80f4 --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/JdbcSourceTests.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.source; + +import java.time.Duration; +import java.util.Collections; + +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; + +import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests; +import org.springframework.cloud.stream.apps.integration.test.LogMatcher; + +import static org.awaitility.Awaitility.await; +import static org.springframework.cloud.stream.apps.integration.test.LogMatcher.contains; + +public class JdbcSourceTests extends AbstractStreamApplicationTests { + + private static LogMatcher logMatcher = new LogMatcher(); + + @Container + private static final DockerComposeContainer environment = new DockerComposeContainer( + kafka(), + resolveTemplate("source/jdbc-source-tests.yml", + Collections.singletonMap("init.sql", resourceAsFile("init.sql")))) + .withLogConsumer("log-sink", logMatcher) + .waitingFor("jdbc-source", Wait.forLogMessage(contains("Started JdbcSource"), 1) + .withStartupTimeout(Duration.ofMinutes(2))); + + @Test + void test() { + await().atMost(Duration.ofSeconds(30)).untilTrue(logMatcher.withRegex(contains("Bart Simpson")).matches()); + } +} diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3Sample.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3Sample.java new file mode 100644 index 0000000..e0a5d9a --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3Sample.java @@ -0,0 +1,109 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.source; + +import com.amazonaws.services.s3.AmazonS3; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3Object; + +import org.springframework.core.io.ClassPathResource; + +public class S3Sample { + private static String bucketName = "testbucket"; + + private static String keyName = "hosts"; + + private static String uploadFileName = "/etc/hosts"; + + private static int minioPort = 33474; + + public static void main(String[] args) throws IOException { + AWSCredentials credentials = new BasicAWSCredentials("minio", "minio123"); + ClientConfiguration clientConfiguration = new ClientConfiguration(); + // clientConfiguration.setSignerOverride("AWSS3V4SignerType"); + AmazonS3 s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration("http://localhost:" + minioPort, + Regions.US_EAST_1.name())) + .withPathStyleAccessEnabled(true) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .build(); + + try { + s3Client.createBucket(bucketName); + System.out.println("Uploading a new object to S3 from a file\n"); + File file = new ClassPathResource("minio/data").getFile(); + // Upload file + s3Client.putObject(new PutObjectRequest(bucketName, keyName, file)); + + // Download file + GetObjectRequest rangeObjectRequest = new GetObjectRequest(bucketName, keyName); + S3Object objectPortion = s3Client.getObject(rangeObjectRequest); + System.out.println("Printing bytes retrieved:"); + displayTextInputStream(objectPortion.getObjectContent()); + } + catch (AmazonServiceException ase) { + System.out.println("Caught an AmazonServiceException, which " + "means your request made it " + + "to Amazon S3, but was rejected with an error response" + " for some reason."); + System.out.println("Error Message: " + ase.getMessage()); + System.out.println("HTTP Status Code: " + ase.getStatusCode()); + System.out.println("AWS Error Code: " + ase.getErrorCode()); + System.out.println("Error Type: " + ase.getErrorType()); + System.out.println("Request ID: " + ase.getRequestId()); + + } + catch (AmazonClientException ace) { + System.out.println("Caught an AmazonClientException, which " + "means the client encountered " + + "an internal error while trying to " + + "communicate with S3, " + "such as not being able to access the network."); + System.out.println("Error Message: " + ace.getMessage()); + + } + + } + + private static void displayTextInputStream(InputStream input) throws IOException { + // Read one text line at a time and display. + BufferedReader reader = new BufferedReader(new InputStreamReader(input)); + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + System.out.println(" " + line); + } + System.out.println(); + } +} \ No newline at end of file diff --git a/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3SourceTests.java b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3SourceTests.java new file mode 100644 index 0000000..27e6138 --- /dev/null +++ b/stream-applications-integration-tests/src/test/java/org/springframework/cloud/stream/apps/integration/test/source/S3SourceTests.java @@ -0,0 +1,103 @@ +/* + * Copyright 2020-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.apps.integration.test.source; + +import java.time.Duration; +import java.util.function.Consumer; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.github.dockerjava.api.command.CreateContainerCmd; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; + +import org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests; +import org.springframework.cloud.stream.apps.integration.test.LogMatcher; +import org.springframework.cloud.stream.apps.integration.test.LogMatcher.LogListener; + +import static org.awaitility.Awaitility.await; +import static org.springframework.cloud.stream.apps.integration.test.AbstractStreamApplicationTests.AppLog.appLog; +import static org.springframework.cloud.stream.apps.integration.test.FluentMap.fluentMap; +import static org.springframework.cloud.stream.apps.integration.test.LogMatcher.contains; + +public class S3SourceTests extends AbstractStreamApplicationTests { + + private static AmazonS3 s3Client; + + private static LogMatcher logMatcher = new LogMatcher(); + + @Container + private static final GenericContainer minio = new GenericContainer("minio/minio:RELEASE.2020-09-05T07-14-49Z") + .withExposedPorts(9000) + .withEnv("MINIO_ACCESS_KEY", "minio") + .withEnv("MINIO_SECRET_KEY", "minio123") + .waitingFor(Wait.forHttp("/minio/health/live")) + .withCreateContainerCmdModifier( + (Consumer) createContainerCmd -> createContainerCmd.withHostName("minio")) + .withLogConsumer(appLog("minio")) + .withCommand("minio", "server", "/data"); + + @BeforeAll + static void init() { + AWSCredentials credentials = new BasicAWSCredentials("minio", "minio123"); + ClientConfiguration clientConfiguration = new ClientConfiguration(); + s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration("http://localhost:" + minio.getMappedPort(9000), + Regions.US_EAST_1.name())) + .withPathStyleAccessEnabled(true) + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .build(); + + + } + + @Container + private final DockerComposeContainer environment = new DockerComposeContainer( + kafka(), + resolveTemplate("source/s3-source-tests.yml", + fluentMap().withEntry("s3.local.dir", resourceAsFile("minio")) + .withEntry("s3.endpoint.url", + "http://minio:" + minio.getMappedPort(9000)) + .withEntry("extraHosts", "minio:" + localHostAddress()))) + .withLogConsumer("log-sink", logMatcher) + .withLogConsumer("s3-source", logMatcher) + .withLogConsumer("log-sink", appLog("logSink")); + + @Test + void test() { + + await().atMost(Duration.ofMinutes(2)).untilTrue(logMatcher.withRegex(contains("Started S3Source")).matches()); + LogListener logListener = logMatcher.withRegex(contains("Bart Simpson")); + s3Client.createBucket("bucket"); + s3Client.putObject(new PutObjectRequest("bucket", "test", resourceAsFile("minio/data"))); + await().atMost(Duration.ofSeconds(30)).untilTrue(logListener.matches()); + } +} diff --git a/stream-applications-integration-tests/src/test/resources/compose-kafka.yml b/stream-applications-integration-tests/src/test/resources/compose-kafka.yml new file mode 100644 index 0000000..d69cc1f --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/compose-kafka.yml @@ -0,0 +1,21 @@ +version: '2.4' +services: + kafka-broker: + image: confluentinc/cp-kafka:5.5.1 + expose: + - "9092" + environment: + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092 + - KAFKA_ADVERTISED_HOST_NAME=kafka-broker + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_MESSAGE_MAX_BYTES=2097152 + depends_on: + - zookeeper + + zookeeper: + image: confluentinc/cp-zookeeper:5.5.1 + expose: + - "2181" + environment: + - ZOOKEEPER_CLIENT_PORT=2181 \ No newline at end of file diff --git a/stream-applications-integration-tests/src/test/resources/init.sql b/stream-applications-integration-tests/src/test/resources/init.sql new file mode 100644 index 0000000..c21c3ad --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/init.sql @@ -0,0 +1,10 @@ +CREATE DATABASE IF NOT EXISTS test; +USE test; +CREATE TABLE IF NOT EXISTS People ( + id INT NOT NULL AUTO_INCREMENT, + name VARCHAR(255) NOT NULL, + street VARCHAR(255) NOT NULL, + city VARCHAR(255) NOT NULL, + deleted CHAR(1) DEFAULT 'N', + PRIMARY KEY (id)); +INSERT INTO People (name, street, city, deleted) VALUES ("Bart Simpson", "Main Street", "Springfield","N"); \ No newline at end of file diff --git a/stream-applications-integration-tests/src/test/resources/logback-test.xml b/stream-applications-integration-tests/src/test/resources/logback-test.xml new file mode 100644 index 0000000..e6016be --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/logback-test.xml @@ -0,0 +1,15 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n + + + + + + + + + + + \ No newline at end of file diff --git a/stream-applications-integration-tests/src/test/resources/minio/data b/stream-applications-integration-tests/src/test/resources/minio/data new file mode 100755 index 0000000..01c51bc --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/minio/data @@ -0,0 +1 @@ +Bart Simpson diff --git a/stream-applications-integration-tests/src/test/resources/processor/http-request-processor-tests.yml b/stream-applications-integration-tests/src/test/resources/processor/http-request-processor-tests.yml new file mode 100644 index 0000000..53a8173 --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/processor/http-request-processor-tests.yml @@ -0,0 +1,33 @@ +version: '2' +services: + + http-source: + image: springcloudstream/http-source-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + ports: + - "{{port}}:{{port}}" + environment: + - SERVER_PORT={{port}} + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=processor + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + http-request-processor: + image: springcloudstream/http-request-processor-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - HTTP_REQUEST_URL_EXPRESSION='{{url}}' + - HTTP_REQUEST_HTTP_METHOD_EXPRESSION='POST' + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=processor + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-request-processor + log-sink: + image: springcloudstream/log-sink-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http-request-processor + diff --git a/stream-applications-integration-tests/src/test/resources/sink/jdbc-sink-tests.yml b/stream-applications-integration-tests/src/test/resources/sink/jdbc-sink-tests.yml new file mode 100644 index 0000000..7f4e628 --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/sink/jdbc-sink-tests.yml @@ -0,0 +1,28 @@ +version: '2.4' +services: + + http-source: + image: springcloudstream/http-source-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + ports: + - "{{port}}:{{port}}" + environment: + - SERVER_PORT={{port}} + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=jdbc + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + jdbc-sink: + image: springcloudstream/jdbc-sink-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - JDBC_CONSUMER_COLUMNS=name,city:address.city,street:address.street + - JDBC_CONSUMER_TABLE_NAME=People + - SPRING_DATASOURCE_PASSWORD={{password}} + - SPRING_DATASOURCE_USERNAME={{user}} + - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver + - SPRING_DATASOURCE_URL={{jdbc.url}} + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=jdbc + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=jdbc-sink + diff --git a/stream-applications-integration-tests/src/test/resources/sink/mongodb-sink-tests.yml b/stream-applications-integration-tests/src/test/resources/sink/mongodb-sink-tests.yml new file mode 100644 index 0000000..63746fb --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/sink/mongodb-sink-tests.yml @@ -0,0 +1,22 @@ +version: '2.4' +services: + http-source: + image: springcloudstream/http-source-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + ports: + - "{{port}}:{{port}}" + environment: + - SERVER_PORT={{port}} + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=mongodb + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + mongodb-sink: + image: springcloudstream/mongodb-sink-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - MONGO_DB_CONSUMER_COLLECTION=test + - SPRING_DATA_MONGODB_URL={{mongodb.url}} + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=mongodb + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=mongodb-sink + diff --git a/stream-applications-integration-tests/src/test/resources/sink/tcp-sink-tests.yml b/stream-applications-integration-tests/src/test/resources/sink/tcp-sink-tests.yml new file mode 100644 index 0000000..5acc983 --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/sink/tcp-sink-tests.yml @@ -0,0 +1,25 @@ +version: '2.4' +services: + + http-source: + image: springcloudstream/http-source-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + ports: + - "{{port}}:{{port}}" + environment: + - SERVER_PORT={{port}} + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=tcp + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + tcp-sink: + image: springcloudstream/tcp-sink-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - TCP_CONSUMER_HOST={{tcp.host}} + - TCP_PORT={{tcp.port}} + - TCP_CONSUMER_ENCODER=CRLF + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=tcp + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=tcp-sink + diff --git a/stream-applications-integration-tests/src/test/resources/source/geode-source-tests.yml b/stream-applications-integration-tests/src/test/resources/source/geode-source-tests.yml new file mode 100644 index 0000000..93c1aec --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/source/geode-source-tests.yml @@ -0,0 +1,24 @@ +version: '2.4' +services: + + geode-source: + image: springcloudstream/geode-source-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - GEODE_POOL_CONNECT_TYPE=server + - GEODE_REGION_REGION_NAME={{geode.region}} + - GEODE_POOL_HOST_ADDRESSES={{geode.host-addresses}} + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + extra_hosts: + - {{extraHosts}} + log-sink: + image: springcloudstream/log-sink-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=geode + diff --git a/stream-applications-integration-tests/src/test/resources/source/http-source-tests.yml b/stream-applications-integration-tests/src/test/resources/source/http-source-tests.yml new file mode 100644 index 0000000..2c217cb --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/source/http-source-tests.yml @@ -0,0 +1,22 @@ +version: '2.4' +services: + + http-source: + image: springcloudstream/http-source-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + ports: + - "{{port}}:{{port}}" + environment: + - SERVER_PORT={{port}} + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + log-sink: + image: springcloudstream/log-sink-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http + diff --git a/stream-applications-integration-tests/src/test/resources/source/jdbc-source-tests.yml b/stream-applications-integration-tests/src/test/resources/source/jdbc-source-tests.yml new file mode 100644 index 0000000..1e1e2e9 --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/source/jdbc-source-tests.yml @@ -0,0 +1,36 @@ +version: '2.4' +services: + mysql: + image: mysql:5.7 + command: --init-file /init.sql + volumes: + - {{init.sql}}:/init.sql + environment: + MYSQL_USER: root + MYSQL_ROOT_PASSWORD: secret + expose: + - 3306 + + jdbc-source: + image: springcloudstream/jdbc-source-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + - mysql + environment: + - JDBC_SUPPLIER_QUERY=SELECT * FROM People WHERE deleted='N' + - JDBC_SUPPLIER_UPDATE=UPDATE People SET deleted='Y' WHERE id=:id + - SPRING_DATASOURCE_PASSWORD=secret + - SPRING_DATASOURCE_USERNAME=root + - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver + - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/test + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + log-sink: + image: springcloudstream/log-sink-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=jdbc + diff --git a/stream-applications-integration-tests/src/test/resources/source/s3-source-tests.yml b/stream-applications-integration-tests/src/test/resources/source/s3-source-tests.yml new file mode 100644 index 0000000..28e4876 --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/source/s3-source-tests.yml @@ -0,0 +1,27 @@ +version: '2.4' +services: + s3-source: + image: springcloudstream/s3-source-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - FILE_CONSUMER_MODE=lines + - S3_COMMON_ENDPOINT_URL={{s3.endpoint.url}} + - S3_COMMON_PATH_STYLE_ACCESS=true + - S3_SUPPLIER_REMOTE_DIR=bucket + - CLOUD_AWS_STACK_AUTO=false + - CLOUD_AWS_CREDENTIALS_ACCESS_KEY=minio + - CLOUD_AWS_CREDENTIALS_SECRET_KEY=minio123 + - CLOUD_AWS_REGION_STATIC=us-east-1 + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + extra_hosts: + - {{extraHosts}} + log-sink: + image: springcloudstream/log-sink-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=http \ No newline at end of file diff --git a/stream-applications-integration-tests/src/test/resources/tick-tock-tests.yml b/stream-applications-integration-tests/src/test/resources/tick-tock-tests.yml new file mode 100644 index 0000000..75a00bc --- /dev/null +++ b/stream-applications-integration-tests/src/test/resources/tick-tock-tests.yml @@ -0,0 +1,17 @@ +version: '2.4' +services: + time-source: + image: springcloudstream/time-source-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + log-sink: + image: springcloudstream/log-sink-kafka:{{stream.apps.version}} + depends_on: + - kafka-broker + environment: + - SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS=kafka-broker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=log + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP=ticktock