diff --git a/kafka-samples/README.adoc b/kafka-samples/README.adoc new file mode 100644 index 0000000..f72849a --- /dev/null +++ b/kafka-samples/README.adoc @@ -0,0 +1,214 @@ +# Kafka Streams application samples + + +## Kafka Streams word count application as a processor + + +When you have a streaming data pipeline that uses Kafka Streams, it can be used as a Processor application in the Spring Cloud Data Flow streaming pipeline. In the following example, you will see how Kafka Streams application can be registered as a Spring Cloud Data Flow `processor` application and subsequently in streaming data pipeline. + +To demonstrate this use case, we have a source application `http-ingest` which is an extension (with some additional functionalities needed for our examples) of the existing out-of-the-box `http` source application. + +``` +cd http-ingest +./mvnw clean package +``` + +At this step, the artifact `http-ingest-1.0.0.BUILD-SNAPSHOT.jar` is available. + +You can register this artifact as the Spring Cloud Data Flow `Source` application as follows, +From the Spring Cloud Data Flow shell, + +``` +dataflow:>app register --name http-ingest --type source --uri file:///spring-cloud-dataflow-samples/kafka-samples/http-ingest/target/http-ingest-1.0.0.BUILD-SNAPSHOT.jar +Successfully registered application 'source:http-ingest' +``` + +The application `kstreams-word-count` is a Kafka Streams application written using Spring Cloud Stream framework. + +``` +cd kstreams-word-count +./mvnw clean package +``` + +After this step, the artifact `kstreams-word-count-1.0.0.BUILD-SNAPSHOT.jar` is available. + +You can register this artifact as the Spring Cloud Data Flow `Processor` application as follows: + +From the Spring Cloud Data Flow shell, + +``` +dataflow:>app register --name kstream-word-count --type processor --uri file:///spring-cloud-dataflow-samples/kafka-samples/kstreams-word-count/target/kstreams-word-count-1.0.0.BUILD-SNAPSHOT.jar +Successfully registered application 'processor:kstream-word-count' +``` + +Let’s register the out-of-the-box `log` application to display the results from the `kstream-word-count` processor (if this was not registered already from the previous app import). + +``` +dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-kafka:2.1.1.RELEASE +Successfully registered application 'sink:log' + +``` + +Create the following stream that listens at HTTP web endpoint `http://localhost:9100` and sends the data to the `kstream-word-count` processor registered in the above step. The KStream processor’s output is then pipelined to the `log` sink application. + +``` +dataflow:>stream create kstream-wc-sample --definition "http-ingest --server.port=9100 | kstream-word-count | log" +Created new stream 'kstream-wc-sample' + +dataflow:>stream deploy kstream-wc-sample --properties "deployer.log.local.inheritLogging=true" +Deployment request has been sent for stream 'kstream-wc-sample' + +dataflow:>http post --target "http://localhost:9100" --data "Baby shark, doo doo doo doo doo doo" +> POST (text/plain) http://localhost:9100 Baby shark, doo doo doo doo doo doo +> 202 ACCEPTED + +``` + +You can see the log sink application now has: + +2019-03-18 15:40:46.195 INFO 34974 --- [container-0-C-1] log-sink : {"word":"baby","count":1,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"} +2019-03-18 15:40:46.198 INFO 34974 --- [container-0-C-1] log-sink : {"word":"shark","count":1,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"} +2019-03-18 15:40:46.199 INFO 34974 --- [container-0-C-1] log-sink : {"word":"doo","count":6,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"} + +## Kafka Streams application that has multiple inputs and single output + + +Let's use the `http-ingest` application to create a couple of streaming pipelines that produce the input data (user regions and user clicks). + +To build `http-ingest` application: + +``` +cd http-ingest +./mvnw clean package +``` + +You can register this artifact as the Spring Cloud Data Flow `Source` application as follows, +From the Spring Cloud Data Flow shell, + +``` +dataflow:>app register --name http-ingest --type source --uri file:///spring-cloud-dataflow-samples/kafka-samples/http-ingest/target/http-ingest-1.0.0.BUILD-SNAPSHOT.jar +Successfully registered application 'source:http-ingest' +``` + +Create a stream that ingests user regions: + +``` +stream create ingest-user-regions --definition "http-ingest --server.port=9000 --http.mapped-request-headers=username --spring.cloud.stream.kafka.bindings.output.producer.messageKeyExpression=headers['username'] > :userRegions" --deploy + +``` + +The above stream uses the named destination to send the user regions HTTP events into the Kafka topic named `userRegions`. We use the http header `username` to extract and set it as the `key` for the Kafka message and the user region String is extracted from the http payload String. + + This sample application also showcases how we can make use function composition support in the streaming application. + +The `http-ingest` has a function bean definition that looks like: + +``` +@Bean +public Function sendAsUserClicks() { + return value -> Long.parseLong(value); +} + +``` + +When this function bean is enabled, the incoming `String` value is converted to `Long` value. + +When sending user clicks count as `Long` we can enable this function for the `http-ingest` while the same application can be used for sending the user region `String` value. + +Creating a stream that ingests user clicks: + +``` +stream create ingest-user-clicks --definition "http-ingest --server.port=9001 --mapped-request-headers=username --spring.cloud.stream.kafka.bindings.output.producer.messageKeyExpression=headers['username'] --spring.cloud.stream.kafka.binder.configuration.value.serializer=org.apache.kafka.common.serialization.LongSerializer --spring.cloud.stream.function.definition=sendAsUserClicks > :userClicks" --deploy + +``` + +The above stream uses the named destination to send the user click HTTP events into the Kafka topic named `userClicks`. +We use the http header `username` to extract and set it as the `key` for the Kafka message and the click count is extracted from the http payload. The function bean `sendAsUserClicks` is enabled using the `spring.cloud.stream.function.definition` property to send the http payload String as Long and using the value serializer as LongSerializer. + +Since the Kafka Streams processor has multiple inputs (one for user click events and another one for user regions events), this application needs to be deployed as `app` type and you need to explicitly configure the bindings to the appropriate Kafka topics and other related configurations. + +To build this application: + +``` +cd kstreams-join-user-clicks-and-region +./mvnw clean package +``` + +Register this Kafka Streams application as `app` type: + +``` +dataflow:> app register --name join-user-clicks-and-regions --type app --uri file:///spring-cloud-dataflow-samples/kafka-samples/kstreams-join-user-clicks-and-region/target/kstreams-join-user-clicks-and-region-1.0.0.BUILD-SNAPSHOT.jar + +``` + +We also have a demo application `log-user-clicks-per-region` that logs the result of the Kafka Streams application. Since the `app` type is not compatible with other stream application types `source`, `sink` and `processor`, this logger application also needs to registered as `app` type to work with the above KStream application. + +To build this application: + +``` +cd kstreams-log-user-clicks-per-region +./mvnw clean package +``` + +``` +dataflow:> app register --name log-user-clicks-per-region --type app --uri file:///spring-cloud-dataflow-samples/kafka-samples/kstreams-log-user-clicks-per-region/target/kstreams-log-user-clicks-per-region-1.0.0.BUILD-SNAPSHOT.jar + +``` + +Let’s create the stream that pipes the Kafka Streams application’s output into the logger input. + +``` +stream create compute-user-clicks-per-region --definition "join-user-clicks-and-regions || log-user-clicks-per-region" + +stream deploy compute-user-clicks-per-region --properties "deployer.log-user-clicks-per-region.local.inheritLogging=true" +``` + +Now, you can confirm all the three streams (ingest-user-regions, ingest-user-clicks and compute-user-clicks-per-region) are deployed successfully using `stream list` command from Spring Cloud Data Flow shell. + +You can send the following sample user regions using cURL commands: + +The `http-ingest` application in the `ingest-user-regions` stream accepts user regions data at `http://localhost:9000` + +``` +curl -X POST http://localhost:9000 -H "username: Glenn" -d "americas" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Soby" -d "americas" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Janne" -d "europe" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Ilaya" -d "americas" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Mark" -d "americas" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Sabby" -d "americas" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Gunnar" -d "americas" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Ilaya" -d "asia" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Chris" -d "americas" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Damien" -d "europe" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Michael" -d "americas" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Christian" -d "europe" -H "Content-Type: text/plain" +curl -X POST http://localhost:9000 -H "username: Oleg" -d "europe" -H "Content-Type: text/plain" +``` + +The `http-ingest` application in the `ingest-user-clicks` stream accepts user clicks data at `http://localhost:9001` + +``` +curl -X POST http://localhost:9001 -H "username: Glenn" -d 9 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Soby" -d 15 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Janne" -d 10 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Mark" -d 7 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Sabby" -d 20 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Gunnar" -d 18 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Ilaya" -d 10 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Chris" -d 5 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Damien" -d 21 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Michael" -d 10 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Christian" -d 12 -H "Content-Type: text/plain" +curl -X POST http://localhost:9001 -H "username: Oleg" -d 10 -H "Content-Type: text/plain" + +``` + +Once the above data is published, you will see the KStream application outputs the processed result into the logger application and it has the following result: + +``` +2019-03-15 15:50:39.251 INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : europe : 53 +2019-03-15 15:50:39.252 INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : asia : 10 +2019-03-15 15:50:39.252 INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : americas : 84 +``` + +You can keep publishing some click data and see the results at the logger application. diff --git a/kafka-samples/http-ingest/.mvn/wrapper/maven-wrapper.jar b/kafka-samples/http-ingest/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 0000000..5fd4d50 Binary files /dev/null and b/kafka-samples/http-ingest/.mvn/wrapper/maven-wrapper.jar differ diff --git a/kafka-samples/http-ingest/.mvn/wrapper/maven-wrapper.properties b/kafka-samples/http-ingest/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..eb91947 --- /dev/null +++ b/kafka-samples/http-ingest/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1 @@ +distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.3.3/apache-maven-3.3.3-bin.zip \ No newline at end of file diff --git a/kafka-samples/http-ingest/README.adoc b/kafka-samples/http-ingest/README.adoc new file mode 100644 index 0000000..601e53c --- /dev/null +++ b/kafka-samples/http-ingest/README.adoc @@ -0,0 +1,20 @@ +# HTTP Ingest Source + +This source application is an extension of out-of-the-box HTTP source application. + +The `http-ingest` has a function bean definition that looks like: + +``` +@Bean +public Function sendAsUserClicks() { + return value -> Long.parseLong(value); +} + +``` + +When this function bean is enabled, the incoming `String` value is converted to `Long` value. +When sending user clicks count as `Long` we can enable this function for the `http-ingest` while the same application can be used for sending the user region `String` value. + +To build this application: + +./mvnw clean package diff --git a/kafka-samples/http-ingest/mvnw b/kafka-samples/http-ingest/mvnw new file mode 100755 index 0000000..a1ba1bf --- /dev/null +++ b/kafka-samples/http-ingest/mvnw @@ -0,0 +1,233 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven2 Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # + # Look for the Apple JDKs first to preserve the existing behaviour, and then look + # for the new JDKs provided by Oracle. + # + if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then + # + # Apple JDKs + # + export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then + # + # Apple JDKs + # + export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then + # + # Oracle JDKs + # + export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then + # + # Apple JDKs + # + export JAVA_HOME=`/usr/libexec/java_home` + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Migwn, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + # TODO classpath? +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` +fi + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + local basedir=$(pwd) + local wdir=$(pwd) + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + wdir=$(cd "$wdir/.."; pwd) + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)} +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} "$@" diff --git a/kafka-samples/http-ingest/mvnw.cmd b/kafka-samples/http-ingest/mvnw.cmd new file mode 100644 index 0000000..2b934e8 --- /dev/null +++ b/kafka-samples/http-ingest/mvnw.cmd @@ -0,0 +1,145 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven2 Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +set MAVEN_CMD_LINE_ARGS=%* + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" + +set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar"" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS% +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% \ No newline at end of file diff --git a/kafka-samples/http-ingest/pom.xml b/kafka-samples/http-ingest/pom.xml new file mode 100644 index 0000000..28f6291 --- /dev/null +++ b/kafka-samples/http-ingest/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.1.3.RELEASE + + + org.springframework.cloud.stream.app + http-ingest + 1.0.0.BUILD-SNAPSHOT + http-source-rabbit + Http ingest application + + + + + org.springframework.cloud.stream.app + app-starters-core-dependencies + 2.1.0.RELEASE + pom + import + + + org.springframework.cloud.stream.app + http-app-dependencies + 2.1.0.RELEASE + pom + import + + + + + + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + + + + org.springframework.cloud.stream.app + spring-cloud-starter-stream-source-http + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/kafka-samples/http-ingest/src/main/java/org/springframework/cloud/stream/app/http/source/kafka/HttpIngest.java b/kafka-samples/http-ingest/src/main/java/org/springframework/cloud/stream/app/http/source/kafka/HttpIngest.java new file mode 100644 index 0000000..5b40326 --- /dev/null +++ b/kafka-samples/http-ingest/src/main/java/org/springframework/cloud/stream/app/http/source/kafka/HttpIngest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.app.http.source.kafka; + +import java.util.function.Function; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; + + +@SpringBootApplication +@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class) +public class HttpIngest { + + @Bean + public Function sendAsUserClicks() { + return value -> Long.parseLong(value); + } + + public static void main(String[] args) { + SpringApplication.run(HttpIngest.class, args); + } +} diff --git a/kafka-samples/http-ingest/src/main/resources/application.yml b/kafka-samples/http-ingest/src/main/resources/application.yml new file mode 100644 index 0000000..935d488 --- /dev/null +++ b/kafka-samples/http-ingest/src/main/resources/application.yml @@ -0,0 +1,6 @@ +spring.cloud.stream.kafka.binder.configuration.key: + serializer: org.apache.kafka.common.serialization.StringSerializer +spring.cloud.stream.kafka.binder.configuration.value: + serializer: org.apache.kafka.common.serialization.StringSerializer +spring.cloud.stream.bindings.output.producer: + useNativeEncoding: true diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/.gitignore b/kafka-samples/kstreams-join-user-clicks-and-region/.gitignore new file mode 100644 index 0000000..2af7cef --- /dev/null +++ b/kafka-samples/kstreams-join-user-clicks-and-region/.gitignore @@ -0,0 +1,24 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +nbproject/private/ +build/ +nbbuild/ +dist/ +nbdist/ +.nb-gradle/ \ No newline at end of file diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/.mvn/wrapper/maven-wrapper.jar b/kafka-samples/kstreams-join-user-clicks-and-region/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 0000000..9cc84ea Binary files /dev/null and b/kafka-samples/kstreams-join-user-clicks-and-region/.mvn/wrapper/maven-wrapper.jar differ diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/.mvn/wrapper/maven-wrapper.properties b/kafka-samples/kstreams-join-user-clicks-and-region/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..c315043 --- /dev/null +++ b/kafka-samples/kstreams-join-user-clicks-and-region/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1 @@ +distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/README.adoc b/kafka-samples/kstreams-join-user-clicks-and-region/README.adoc new file mode 100644 index 0000000..7c0b711 --- /dev/null +++ b/kafka-samples/kstreams-join-user-clicks-and-region/README.adoc @@ -0,0 +1,15 @@ +# Kafka Streams application that joins user click counts and regions + +This application has two inputs (user clicks and user regions) and one output (user clicks per region). +Since the Kafka Streams processor has multiple inputs (one for user click events and another one for user regions events), this application needs to be deployed as `app` type and you need to explicitly configure the bindings to the appropriate Kafka topics and other related configurations. + + +To build this application: + +./mvnw clean package + +Register this KStream application as `app` type: + +``` +dataflow:> app register --name join-user-clicks-and-regions --type app --uri file:///spring-cloud-dataflow-samples/kafka-samples/kstreams-join-user-clicks-and-region/target/kstreams-join-user-clicks-and-region-1.0.0.BUILD-SNAPSHOT.jar +``` diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/mvnw b/kafka-samples/kstreams-join-user-clicks-and-region/mvnw new file mode 100755 index 0000000..5bf251c --- /dev/null +++ b/kafka-samples/kstreams-join-user-clicks-and-region/mvnw @@ -0,0 +1,225 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven2 Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Migwn, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + # TODO classpath? +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +echo $MAVEN_PROJECTBASEDIR +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/mvnw.cmd b/kafka-samples/kstreams-join-user-clicks-and-region/mvnw.cmd new file mode 100644 index 0000000..019bd74 --- /dev/null +++ b/kafka-samples/kstreams-join-user-clicks-and-region/mvnw.cmd @@ -0,0 +1,143 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven2 Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" + +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/pom.xml b/kafka-samples/kstreams-join-user-clicks-and-region/pom.xml new file mode 100644 index 0000000..e711a3e --- /dev/null +++ b/kafka-samples/kstreams-join-user-clicks-and-region/pom.xml @@ -0,0 +1,56 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.1.3.RELEASE + + + org.springframework.cloud + kstreams-join-user-clicks-and-region + 1.0.0.BUILD-SNAPSHOT + jar + + kstreams-join-user-clicks-and-region + Demo project for Kafka Streams Table Join + + + + + org.springframework.cloud.stream.app + app-starters-core-dependencies + 2.1.0.RELEASE + pom + import + + + + + + + + + org.springframework.cloud + spring-cloud-stream-binder-kafka-streams + + + + + org.springframework.boot + spring-boot-starter-web + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java b/kafka-samples/kstreams-join-user-clicks-and-region/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java new file mode 100644 index 0000000..2b91f03 --- /dev/null +++ b/kafka-samples/kstreams-join-user-clicks-and-region/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java @@ -0,0 +1,95 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.streams.table.join; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Serialized; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor; +import org.springframework.messaging.handler.annotation.SendTo; + +@SpringBootApplication +public class KafkaStreamsTableJoin { + + public static void main(String[] args) { + SpringApplication.run(KafkaStreamsTableJoin.class, args); + } + + @EnableBinding(KStreamProcessorX.class) + public static class KStreamToTableJoinApplication { + + + @StreamListener + @SendTo("output") + public KStream process(@Input("userClicksInput") KStream userClicksStream, + @Input("userRegionsTableInput") KTable userRegionsTable) { + + return userClicksStream + .leftJoin(userRegionsTable, + (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks), + Joined.with(Serdes.String(), Serdes.Long(), null)) + .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks())) + .groupByKey(Serialized.with(Serdes.String(), Serdes.Long())) + .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks) + .toStream(); + } + } + + + interface KStreamProcessorX extends KafkaStreamsProcessor { + + @Input("userRegionsTableInput") + KTable inputKTable(); + + @Input("userClicksInput") + KStream userClicks(); + } + + private static final class RegionWithClicks { + + private final String region; + private final long clicks; + + public RegionWithClicks(String region, long clicks) { + if (region == null || region.isEmpty()) { + throw new IllegalArgumentException("region must be set"); + } + if (clicks < 0) { + throw new IllegalArgumentException("clicks must not be negative"); + } + this.region = region; + this.clicks = clicks; + } + + public String getRegion() { + return region; + } + + public long getClicks() { + return clicks; + } + + } +} diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/src/main/resources/application.yml b/kafka-samples/kstreams-join-user-clicks-and-region/src/main/resources/application.yml new file mode 100644 index 0000000..bc24cd3 --- /dev/null +++ b/kafka-samples/kstreams-join-user-clicks-and-region/src/main/resources/application.yml @@ -0,0 +1,28 @@ +spring.application.name: kstream-join-user-clicks-and-regions +spring.cloud.stream.bindings.userClicksInput: + destination: userClicks + consumer: + useNativeDecoding: true +spring.cloud.stream.bindings.userRegionsTableInput: + destination: userRegions + consumer: + useNativeDecoding: true +spring.cloud.stream.bindings.output: + destination: userClickRegions + producer: + useNativeEncoding: true +spring.cloud.stream.kafka.streams.bindings.userClicksInput: + consumer: + keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde + valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde +spring.cloud.stream.kafka.streams.bindings.userRegionsTableInput: + consumer: + keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde + valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde +spring.cloud.stream.kafka.streams.bindings.output: + producer: + keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde + valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde +spring.cloud.stream.kafka.streams.binder: + configuration: + commit.interval.ms: 1000 \ No newline at end of file diff --git a/kafka-samples/kstreams-join-user-clicks-and-region/src/main/resources/logback.xml b/kafka-samples/kstreams-join-user-clicks-and-region/src/main/resources/logback.xml new file mode 100644 index 0000000..870ac9e --- /dev/null +++ b/kafka-samples/kstreams-join-user-clicks-and-region/src/main/resources/logback.xml @@ -0,0 +1,12 @@ + + + + + %d{ISO8601} %5p %t %c{2}:%L - %m%n + + + + + + + \ No newline at end of file diff --git a/kafka-samples/kstreams-log-user-clicks-per-region/.mvn/wrapper/maven-wrapper.jar b/kafka-samples/kstreams-log-user-clicks-per-region/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 0000000..5fd4d50 Binary files /dev/null and b/kafka-samples/kstreams-log-user-clicks-per-region/.mvn/wrapper/maven-wrapper.jar differ diff --git a/kafka-samples/kstreams-log-user-clicks-per-region/.mvn/wrapper/maven-wrapper.properties b/kafka-samples/kstreams-log-user-clicks-per-region/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..eb91947 --- /dev/null +++ b/kafka-samples/kstreams-log-user-clicks-per-region/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1 @@ +distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.3.3/apache-maven-3.3.3-bin.zip \ No newline at end of file diff --git a/kafka-samples/kstreams-log-user-clicks-per-region/README.adoc b/kafka-samples/kstreams-log-user-clicks-per-region/README.adoc new file mode 100644 index 0000000..0d4695e --- /dev/null +++ b/kafka-samples/kstreams-log-user-clicks-per-region/README.adoc @@ -0,0 +1,14 @@ +# Logger sink that logs the output from the user clicks per region Kafka Streams application's output + +This sink application is a modification of out-of-the-box `log` application. + + +To build this application: + +./mvnw clean package + +To register this application from Spring Cloud Data Flow shell: + +``` +dataflow:> app register --name log-user-clicks-per-region --type app --uri file:///spring-cloud-dataflow-samples/kafka-samples/kstreams-log-user-clicks-per-region/target/kstreams-log-user-clicks-per-region-1.0.0.BUILD-SNAPSHOT.jar +``` diff --git a/kafka-samples/kstreams-log-user-clicks-per-region/mvnw b/kafka-samples/kstreams-log-user-clicks-per-region/mvnw new file mode 100755 index 0000000..a1ba1bf --- /dev/null +++ b/kafka-samples/kstreams-log-user-clicks-per-region/mvnw @@ -0,0 +1,233 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven2 Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # + # Look for the Apple JDKs first to preserve the existing behaviour, and then look + # for the new JDKs provided by Oracle. + # + if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then + # + # Apple JDKs + # + export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then + # + # Apple JDKs + # + export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then + # + # Oracle JDKs + # + export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home + fi + + if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then + # + # Apple JDKs + # + export JAVA_HOME=`/usr/libexec/java_home` + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Migwn, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + # TODO classpath? +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` +fi + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + local basedir=$(pwd) + local wdir=$(pwd) + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + wdir=$(cd "$wdir/.."; pwd) + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)} +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} "$@" diff --git a/kafka-samples/kstreams-log-user-clicks-per-region/mvnw.cmd b/kafka-samples/kstreams-log-user-clicks-per-region/mvnw.cmd new file mode 100644 index 0000000..2b934e8 --- /dev/null +++ b/kafka-samples/kstreams-log-user-clicks-per-region/mvnw.cmd @@ -0,0 +1,145 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven2 Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +set MAVEN_CMD_LINE_ARGS=%* + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" + +set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar"" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS% +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% \ No newline at end of file diff --git a/kafka-samples/kstreams-log-user-clicks-per-region/pom.xml b/kafka-samples/kstreams-log-user-clicks-per-region/pom.xml new file mode 100644 index 0000000..f88048f --- /dev/null +++ b/kafka-samples/kstreams-log-user-clicks-per-region/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.1.3.RELEASE + + + org.springframework.cloud.stream.app + kstreams-log-user-clicks-per-region + 1.0.0.BUILD-SNAPSHOT + kstreams-log-user-clicks-per-region + + + + + org.springframework.cloud.stream.app + app-starters-core-dependencies + 2.1.0.RELEASE + pom + import + + + + + + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + + + + org.springframework.boot + spring-boot-starter-web + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/kafka-samples/kstreams-log-user-clicks-per-region/src/main/java/org/springframework/cloud/stream/app/log/UserClicksPerRegion.java b/kafka-samples/kstreams-log-user-clicks-per-region/src/main/java/org/springframework/cloud/stream/app/log/UserClicksPerRegion.java new file mode 100644 index 0000000..127efc7 --- /dev/null +++ b/kafka-samples/kstreams-log-user-clicks-per-region/src/main/java/org/springframework/cloud/stream/app/log/UserClicksPerRegion.java @@ -0,0 +1,56 @@ +/* + * Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.app.log; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.handler.LoggingHandler; +import org.springframework.integration.support.MutableMessage; +import org.springframework.messaging.Message; + +@SpringBootApplication +public class UserClicksPerRegion { + + @EnableBinding(Sink.class) + public static class Logger { + + @Bean + @ServiceActivator(inputChannel = Sink.INPUT) + public LoggingHandler logSinkHandler() { + LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.INFO) { + + @Override + protected void handleMessageInternal(Message message) throws Exception { + Long userClicksPerRegion = (Long) message.getPayload(); + String userRegion = (String) message.getHeaders().get("kafka_receivedMessageKey"); + message = new MutableMessage<>(userRegion + " : " + userClicksPerRegion.toString()); + super.handleMessageInternal(message); + } + }; + return loggingHandler; + } + + } + + public static void main(String[] args) { + SpringApplication.run(UserClicksPerRegion.class, args); + } +} diff --git a/kafka-samples/kstreams-log-user-clicks-per-region/src/main/resources/application.yml b/kafka-samples/kstreams-log-user-clicks-per-region/src/main/resources/application.yml new file mode 100644 index 0000000..d65f7ac --- /dev/null +++ b/kafka-samples/kstreams-log-user-clicks-per-region/src/main/resources/application.yml @@ -0,0 +1,8 @@ +spring.cloud.stream.bindings.input: + destination: userClickRegions + consumer: + useNativeEncoding: true +spring.cloud.stream.kafka.binder.configuration.key: + deserializer: org.apache.kafka.common.serialization.StringDeserializer +spring.cloud.stream.kafka.binder.configuration.value: + deserializer: org.apache.kafka.common.serialization.LongDeserializer diff --git a/kafka-samples/kstreams-word-count/.gitignore b/kafka-samples/kstreams-word-count/.gitignore new file mode 100644 index 0000000..2af7cef --- /dev/null +++ b/kafka-samples/kstreams-word-count/.gitignore @@ -0,0 +1,24 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +nbproject/private/ +build/ +nbbuild/ +dist/ +nbdist/ +.nb-gradle/ \ No newline at end of file diff --git a/kafka-samples/kstreams-word-count/.mvn/wrapper/maven-wrapper.jar b/kafka-samples/kstreams-word-count/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 0000000..9cc84ea Binary files /dev/null and b/kafka-samples/kstreams-word-count/.mvn/wrapper/maven-wrapper.jar differ diff --git a/kafka-samples/kstreams-word-count/.mvn/wrapper/maven-wrapper.properties b/kafka-samples/kstreams-word-count/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..c315043 --- /dev/null +++ b/kafka-samples/kstreams-word-count/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1 @@ +distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip diff --git a/kafka-samples/kstreams-word-count/mvnw b/kafka-samples/kstreams-word-count/mvnw new file mode 100755 index 0000000..5bf251c --- /dev/null +++ b/kafka-samples/kstreams-word-count/mvnw @@ -0,0 +1,225 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven2 Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Migwn, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + # TODO classpath? +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +echo $MAVEN_PROJECTBASEDIR +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/kafka-samples/kstreams-word-count/mvnw.cmd b/kafka-samples/kstreams-word-count/mvnw.cmd new file mode 100644 index 0000000..019bd74 --- /dev/null +++ b/kafka-samples/kstreams-word-count/mvnw.cmd @@ -0,0 +1,143 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven2 Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" + +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/kafka-samples/kstreams-word-count/pom.xml b/kafka-samples/kstreams-word-count/pom.xml new file mode 100644 index 0000000..b391163 --- /dev/null +++ b/kafka-samples/kstreams-word-count/pom.xml @@ -0,0 +1,55 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.1.3.RELEASE + + + org.springframework.cloud + kstreams-word-count + 1.0.0.BUILD-SNAPSHOT + jar + + kstreams-word-count + Demo project for Kafka Streams word count sample + + + + + org.springframework.cloud.stream.app + app-starters-core-dependencies + 2.1.0.RELEASE + pom + import + + + + + + + + org.springframework.cloud + spring-cloud-stream-binder-kafka-streams + + + + + org.springframework.boot + spring-boot-starter-web + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/kafka-samples/kstreams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java b/kafka-samples/kstreams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java new file mode 100644 index 0000000..8746d12 --- /dev/null +++ b/kafka-samples/kstreams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java @@ -0,0 +1,109 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.streams.word.count; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor; +import org.springframework.messaging.handler.annotation.SendTo; + +import java.util.Arrays; +import java.util.Date; + +@SpringBootApplication +public class KafkaStreamsWordCountApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaStreamsWordCountApplication.class, args); + } + + @EnableBinding(KafkaStreamsProcessor.class) + public static class WordCountProcessorApplication { + + @StreamListener("input") + @SendTo("output") + public KStream process(KStream input) { + + return input + .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .map((key, value) -> new KeyValue<>(value, value)) + .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + .windowedBy(TimeWindows.of(30000)) + .count(Materialized.as("WordCounts-1")) + .toStream() + .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))); + } + } + + static class WordCount { + + private String word; + + private long count; + + private Date start; + + private Date end; + + WordCount(String word, long count, Date start, Date end) { + this.word = word; + this.count = count; + this.start = start; + this.end = end; + } + + public String getWord() { + return word; + } + + public void setWord(String word) { + this.word = word; + } + + public long getCount() { + return count; + } + + public void setCount(long count) { + this.count = count; + } + + public Date getStart() { + return start; + } + + public void setStart(Date start) { + this.start = start; + } + + public Date getEnd() { + return end; + } + + public void setEnd(Date end) { + this.end = end; + } + } +} diff --git a/kafka-samples/kstreams-word-count/src/main/resources/application.yml b/kafka-samples/kstreams-word-count/src/main/resources/application.yml new file mode 100644 index 0000000..39dc432 --- /dev/null +++ b/kafka-samples/kstreams-word-count/src/main/resources/application.yml @@ -0,0 +1,5 @@ +spring.cloud.stream.kafka.streams: + binder.configuration: + default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde + default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde + bindings.input.consumer.application-id: kstream-word-count-sample \ No newline at end of file diff --git a/pom.xml b/pom.xml index 985eb43..ea13dfe 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ Spring Cloud Data Flow Samples ${basedir}/.. - 1.2.3.RELEASE + 2.0.0.RELEASE