diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/.gitignore b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/.gitignore new file mode 100644 index 000000000..7ed0d6b67 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/.gitignore @@ -0,0 +1,32 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/README.md b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/README.md new file mode 100644 index 000000000..5159253da --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/README.md @@ -0,0 +1,104 @@ +# Azure Function with Kafka Trigger & Output Binding + +Spring Cloud Function example for implementing an Azure functions with [KafkaTrigger](https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-kafka-trigger?tabs=in-process%2Cconfluent&pivots=programming-language-java) and [Kafka Binding](https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-kafka-output?tabs=in-process%2Cconfluent&pivots=programming-language-java) support. + +The Azure function is triggered by messages sent on the `trigger` topic and in turn calls the `uppercase` SCF with the trigger payload. The SCF capitalizes the input json value fields and sends the result to an output Kafka topic called: `output`. + +## Running Locally + +First start a Kafka server locally. +The `./src/main/resources/docker-compose-demo.yaml` helps to start locally Zookeeper, Kafka and Kafka UI. + +``` +docker-compose -f ./src/main/resources/docker-compose-demo.yaml up +``` +You can reach the Kafka UI (Redpanda) dashboard on http://localhost:8080/topics + +![](./src/main/doc/kafka-ui-topics.png) + +The docker-compose pre-creates the `trigger` and `output` topics used by the function. + +Next build and run the Azure function: + +``` +./mvnw clean package +./mvnw azure-functions:run +``` + +From the Kafka UI, got to the `trigger` topic view (http://localhost:8080/topics/trigger), select `Actions/Publish Message` and submit a new JSON message: +```json +{ "foo" : "bar"} +``` + +![](./src/main/doc/kafka-publish-message.png) + + +Push the `Publish` button and let the function do its job and check the `output` topic (http://localhost:8080/topics/output) : + +![](./src/main/doc/output-topic.png) + +e.g.the `bar` is in uppercase: + +```json +{ "foo" : "BAR"} +``` + + + +## Running on Azure (TODO: WIP) + +Make sure you are logged in your Azure account. +``` +az login +``` + +Build and deploy + +``` +./mvnw clean package +./mvnw azure-functions:deploy +``` + +## Implementation + +Configure the [Kafka extension](https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-kafka?tabs=in-process%2Cportal&pivots=programming-language-java#hostjson-settings) in the `host.json` settings: + +```json +{ + "functionTimeout": "00:05:00", + "version": "2.0", + "extensions": { + "kafka": { + "maxBatchSize": 64, + "SubscriberIntervalInSeconds": 1, + "ExecutorChannelCapacity": 1, + "ChannelFullRetryIntervalInMs": 50 + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[3.3.0, 4.0.0)" + } +} +``` + +Also to allow your functions to scale properly on the Premium plan when using Kafka triggers and bindings, you need to [enable runtime scale monitoring](https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-kafka?tabs=in-process%2Cportal&pivots=programming-language-java#enable-runtime-scaling). + + +## Notes + +* Disable the `spring-boot-maven-plugin` in favor of the `azure-functions-maven-plugin`. +* Exclude the `org.springframework.boot:spring-boot-starter-logging` dependency from the `org.springframework.cloud:spring-cloud-function-adapter-azure`. +* In `local.settings.json` set the local values for the `%BrokerList`, `%ConfluentCloudUsername%` and the `%TriggerKafkaTopic%` trigger and binding variables: + +``` +{ + "IsEncrypted": false, + "Values": { +... + "BrokerList": "localhost:9092", + "ConfluentCloudUsername": "test", + "TriggerKafkaTopic": "trigger" + } +} +``` diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/mvnw b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/mvnw new file mode 100755 index 000000000..8a8fb2282 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/mvnw @@ -0,0 +1,316 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /usr/local/etc/mavenrc ] ; then + . /usr/local/etc/mavenrc + fi + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`\\unset -f command; \\command -v java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + $MAVEN_DEBUG_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" \ + "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/mvnw.cmd b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/mvnw.cmd new file mode 100644 index 000000000..1d8ab018e --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/mvnw.cmd @@ -0,0 +1,188 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM https://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %* +if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %* +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar" + +FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET DOWNLOAD_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %DOWNLOAD_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% ^ + %JVM_CONFIG_MAVEN_PROPS% ^ + %MAVEN_OPTS% ^ + %MAVEN_DEBUG_OPTS% ^ + -classpath %WRAPPER_JAR% ^ + "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^ + %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat" +if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%"=="on" pause + +if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE% + +cmd /C exit /B %ERROR_CODE% diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/pom.xml b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/pom.xml new file mode 100644 index 000000000..ab5dfe128 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/pom.xml @@ -0,0 +1,227 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.0.0-SNAPSHOT + + + + example.scf.azure + kafka-trigger-azure-spring-function + 0.0.1-SNAPSHOT + kafka-trigger-demo + Demo project for Spring Boot + + 17 + 4.0.0-SNAPSHOT + 2.1.0 + + example.KafkaTriggerDemoApplication + + example-spring-function-resource-group + kafka-trigger-azure-spring-function + westeurope + ${project.build.directory}/azure-functions/${functionAppName} + java-functions-app-service-plan + + + + + org.springframework.cloud + spring-cloud-function-context + + + spring-cloud-function-adapter-azure + org.springframework.cloud + + + org.springframework.boot + spring-boot-starter-logging + + + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + org.springframework.cloud + spring-cloud-function-dependencies + ${spring-cloud-dependencies.version} + pom + import + + + com.microsoft.azure.functions + azure-functions-java-library + ${azure.functions.java.core.version} + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${java.version} + ${java.version} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${stagingDirectory}/lib + false + false + true + runtime + azure-functions-java-library + + + + + + com.microsoft.azure + azure-functions-maven-plugin + + + ${functionResourceGroup} + ${functionAppName} + ${functionAppRegion} + ${functionAppServicePlanName} + + linux + 17 + + + + + FUNCTIONS_EXTENSION_VERSION + ~4 + + + FUNCTIONS_WORKER_RUNTIME + java + + + + + + package-functions + + package + + + + + + maven-resources-plugin + + + copy-resources + package + + copy-resources + + + true + + ${stagingDirectory} + + + + ${project.basedir}/src/main/azure + + + ** + + + + + + + + + + maven-clean-plugin + 3.1.0 + + + + obj + + + + + + + + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + false + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + false + + + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + false + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + false + + + + + diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/azure/host.json b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/azure/host.json new file mode 100644 index 000000000..8bb952206 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/azure/host.json @@ -0,0 +1,16 @@ +{ + "functionTimeout": "00:05:00", + "version": "2.0", + "extensions": { + "kafka": { + "maxBatchSize": 64, + "SubscriberIntervalInSeconds": 1, + "ExecutorChannelCapacity": 1, + "ChannelFullRetryIntervalInMs": 50 + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[3.3.0, 4.0.0)" + } +} \ No newline at end of file diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/azure/local.settings.json b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/azure/local.settings.json new file mode 100644 index 000000000..7172b27d1 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/azure/local.settings.json @@ -0,0 +1,12 @@ +{ + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "AzureWebJobsDashboard": "", + "FUNCTIONS_WORKER_RUNTIME": "java", + + "BrokerList": "localhost:9092", + "ConfluentCloudUsername": "test", + "TriggerKafkaTopic": "trigger" + } +} \ No newline at end of file diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/doc/kafka-publish-message.png b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/doc/kafka-publish-message.png new file mode 100644 index 000000000..ed8743c90 Binary files /dev/null and b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/doc/kafka-publish-message.png differ diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/doc/kafka-ui-topics.png b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/doc/kafka-ui-topics.png new file mode 100644 index 000000000..9beedac69 Binary files /dev/null and b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/doc/kafka-ui-topics.png differ diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/doc/output-topic.png b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/doc/output-topic.png new file mode 100644 index 000000000..7c46926c6 Binary files /dev/null and b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/doc/output-topic.png differ diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/KafkaTriggerDemoApplication.java b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/KafkaTriggerDemoApplication.java new file mode 100644 index 000000000..09eed0f5c --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/KafkaTriggerDemoApplication.java @@ -0,0 +1,61 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example; + +import java.util.Map; +import java.util.function.Function; + +import com.microsoft.azure.functions.ExecutionContext; +import example.entity.KafkaEntity; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.function.json.JsonMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; + +@SpringBootApplication +public class KafkaTriggerDemoApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaTriggerDemoApplication.class, args); + } + + @Bean + public Function, String> uppercase(JsonMapper mapper) { + return message -> { + + // // (Optionally) access and use the Azure function context. + ExecutionContext context = (ExecutionContext) message.getHeaders().get("executionContext"); + context.getLogger().info("Kafka triggered with data: " + message.getPayload()); + + // Convert the message payload into Azure's KafkaEntity format. + KafkaEntity kafkaEntity = mapper.fromJson(message.getPayload(), KafkaEntity.class); + + // Business logic: convert the JSON string values into uppercase. + if (kafkaEntity.getValue() != null) { + Map valueMap = mapper.fromJson(kafkaEntity.getValue(), Map.class); + if (valueMap != null) { + valueMap.forEach((k, v) -> valueMap.put(k, + v != null && v instanceof String ? ((String) v).toUpperCase() : null)); + return mapper.toString(valueMap); + } + } + + return mapper.toString(null); + }; + } +} diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/UppercaseHandler.java b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/UppercaseHandler.java new file mode 100644 index 000000000..87220f001 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/UppercaseHandler.java @@ -0,0 +1,69 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package example; + +import com.microsoft.azure.functions.BrokerAuthenticationMode; +import com.microsoft.azure.functions.BrokerProtocol; +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.OutputBinding; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.KafkaOutput; +import com.microsoft.azure.functions.annotation.KafkaTrigger; + +import org.springframework.cloud.function.adapter.azure.FunctionInvoker; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +public class UppercaseHandler extends FunctionInvoker, String> { + + + @FunctionName("KafkaTrigger") + public void execute( + @KafkaTrigger( + name = "KafkaTrigger", + topic = "%TriggerKafkaTopic%", + brokerList = "%BrokerList%", + consumerGroup = "$Default", + username = "%ConfluentCloudUsername%", + password = "ConfluentCloudPassword", + authenticationMode = BrokerAuthenticationMode.PLAIN, + protocol = BrokerProtocol.PLAINTEXT, + // protocol = BrokerProtocol.SASLSSL, + // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows. + dataType = "string") String kafkaEventData, + @KafkaOutput( + name = "kafkaOutput", + topic = "output", + brokerList="%BrokerList%", + username = "%ConfluentCloudUsername%", + password = "ConfluentCloudPassword", + authenticationMode = BrokerAuthenticationMode.PLAIN, + // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows. + protocol = BrokerProtocol.PLAINTEXT + //protocol = BrokerProtocol.SASLSSL + ) OutputBinding output, + final ExecutionContext context) { + + context.getLogger().info(kafkaEventData); + + Message message = MessageBuilder.withPayload(kafkaEventData).build(); + + String response = handleRequest(message, context); + + output.setValue(response); + } +} diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/entity/KafkaEntity.java b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/entity/KafkaEntity.java new file mode 100644 index 000000000..af9be822b --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/entity/KafkaEntity.java @@ -0,0 +1,93 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.entity; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class KafkaEntity { + @JsonProperty("Offset") + private int offset; + @JsonProperty("Partition") + private int partition; + @JsonProperty("Timestamp") + private String timestamp; + @JsonProperty("Topic") + private String topic; + @JsonProperty("Key") + private String key; + @JsonProperty("Value") + private String value; + @JsonProperty("Headers") + private KafkaHeaders[] headers; + + public int getOffset() { + return offset; + } + + public void setOffset(int offset) { + this.offset = offset; + } + + public int getPartition() { + return partition; + } + + public void setPartition(int partition) { + this.partition = partition; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getValue() { + return value; + } + + + public void setValue(String value) { + this.value = value; + } + + public KafkaHeaders[] getHeaders() { + return headers; + } + + public void setHeaders(KafkaHeaders[] headers) { + this.headers = headers; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + +} diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/entity/KafkaHeaders.java b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/entity/KafkaHeaders.java new file mode 100644 index 000000000..af26e25f7 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/java/example/entity/KafkaHeaders.java @@ -0,0 +1,42 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example.entity; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class KafkaHeaders { + @JsonProperty("Key") + private String key; + @JsonProperty("Value") + private String value; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + +} diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/resources/docker-compose-demo.yaml b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/resources/docker-compose-demo.yaml new file mode 100644 index 000000000..c184f5a1d --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/src/main/resources/docker-compose-demo.yaml @@ -0,0 +1,63 @@ +version: '2.1' + +services: + zookeeper: + image: zookeeper:3.4.9 + hostname: zookeeper + ports: + - "2181:2181" + environment: + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_SERVERS: server.1=zookeeper:2888:3888 + # volumes: + # - ./zk-single-kafka-single/zookeeper/data:/data + # - ./zk-single-kafka-single/zookeeper/datalog:/datalog + + kafka1: + image: confluentinc/cp-kafka:7.2.2 + hostname: kafka1 + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + # volumes: + # - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data + depends_on: + - zookeeper + + + init-kafka: + image: confluentinc/cp-kafka:7.2.2 + depends_on: + - kafka1 + entrypoint: [ '/bin/sh', '-c' ] + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka1:19092 --list + + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic trigger --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic output --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka1:19092 --list + " + + kconsole: + image: docker.redpanda.com/vectorized/console:latest + restart: on-failure + hostname: kconsole + ports: + - "8080:8080" + environment: + KAFKA_BROKERS: "kafka1:19092" + depends_on: + - kafka1 \ No newline at end of file diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/wrapper/maven-wrapper.jar b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/wrapper/maven-wrapper.jar new file mode 100644 index 000000000..c1dd12f17 Binary files /dev/null and b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/wrapper/maven-wrapper.jar differ diff --git a/spring-cloud-function-samples/function-sample-azure-kafka-trigger/wrapper/maven-wrapper.properties b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/wrapper/maven-wrapper.properties new file mode 100644 index 000000000..b74bf7fcd --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure-kafka-trigger/wrapper/maven-wrapper.properties @@ -0,0 +1,2 @@ +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.6/apache-maven-3.8.6-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar diff --git a/spring-cloud-function-samples/pom.xml b/spring-cloud-function-samples/pom.xml index ed55a04ef..4e051eef1 100644 --- a/spring-cloud-function-samples/pom.xml +++ b/spring-cloud-function-samples/pom.xml @@ -25,6 +25,8 @@ function-sample-aws-custom-bean function-sample-supplier-exporter function-sample-azure + function-sample-azure-timer-trigger + function-sample-azure-kafka-trigger function-sample-spring-integration function-sample-gcp-http function-sample-gcp-background