Initial implementation for AWS Kinesis Binder

Fixes spring-cloud/spring-cloud-stream-starters#9

Basic code to implement Kinesis binder, based on code from Kafka binder.

Update to PR 1 based on review by artembilan

Added gitignore for IDEA files
Added Maven wrapper

pom.xml:
Updated project version to match three digit style
Updated spring-cloud-build to 1.3.3.BUILD-SNAPSHOT
Updated spring-cloud-stream to 1.3.0.BUILD-SNAPSHOT
Uncommented checkstyle

KinesisStreamProvisioner:
Renamed KinesisTopicProvisioner to KinesisStreamProvisioner
Replaced Logging with commons logging
Added check for logging enabled to log output
Renamed arguments to streamName and shards
Qualified properties with this.

spring-cloud-stream-binder-kinesis-core/pom.xml:
Added aws-java-sdk-kinesis dependency

spring-cloud-stream-binder-kinesis/pom.xml:
Removed dependencies provided transitively from core
Replaced spaces with tab indents in XML files
Removed unused plugin

KinesisBinderEnvironmentPostProcessor:
Removed copied properties configuration

KinesisBinderConfiguration:
Removed redundant annotations
Removed redundant logger

KinesisMessageChannelBinder:
Removed redundant setOutputChannel

* Some polishing and reformatting
This commit is contained in:
Peter.Oates
2017-05-25 10:28:08 +01:00
committed by Artem Bilan
parent a783a1ff8c
commit 22f95aa1e6
21 changed files with 1174 additions and 24 deletions

49
.gitignore vendored
View File

@@ -1,22 +1,27 @@
# Compiled class file apps/
*.class /application.yml
/application.properties
# Log file asciidoctor.css
*.log *~
.#*
# BlueJ files *#
*.ctxt target/
build/
# Mobile Tools for Java (J2ME) bin/
.mtj.tmp/ _site/
.classpath
# Package Files # .project
*.jar .settings
*.war .springBeans
*.ear .DS_Store
*.zip *.sw*
*.tar.gz *.iml
*.rar *.ipr
*.iws
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml .idea/
hs_err_pid* .factorypath
spring-xd-samples/*/xd
dump.rdb
coverage-error.log
.apt_generated
aws.credentials.properties

1
.mvn/jvm.config Normal file
View File

@@ -0,0 +1 @@
-Xmx1024m -XX:CICompilerCount=1 -XX:TieredStopAtLevel=1 -Djava.security.egd=file:/dev/./urandom

1
.mvn/maven.config Normal file
View File

@@ -0,0 +1 @@
-DaltSnapshotDeploymentRepository=repo.spring.io::default::https://repo.spring.io/libs-snapshot-local -P spring

BIN
.mvn/wrapper/maven-wrapper.jar vendored Executable file

Binary file not shown.

1
.mvn/wrapper/maven-wrapper.properties vendored Executable file
View File

@@ -0,0 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

View File

@@ -1,3 +1,3 @@
# Spring Cloud Stream binder for AWS Kinesis # Spring Cloud Stream Binder for AWS Kinesis
This repository will contain the code for the Spring Cloud Stream binder implementation for AWS Kinesis. This repository contains the code for the Spring Cloud Stream Binder implementation for AWS Kinesis.

225
mvnw vendored Executable file
View File

@@ -0,0 +1,225 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

143
mvnw.cmd vendored Executable file
View File

@@ -0,0 +1,143 @@
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven2 Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%

139
pom.xml Normal file
View File

@@ -0,0 +1,139 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.3.BUILD-SNAPSHOT</version>
<relativePath/>
</parent>
<artifactId>spring-cloud-stream-binder-kinesis-parent</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<spring-cloud-stream.version>1.3.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-aws.version>1.2.2.BUILD-SNAPSHOT</spring-cloud-aws.version>
<spring-integration-aws-version>1.1.0.BUILD-SNAPSHOT</spring-integration-aws-version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kinesis-core</module>
<module>spring-cloud-stream-binder-kinesis</module>
<module>spring-cloud-starter-stream-kinesis</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-aws</artifactId>
<version>${spring-integration-aws-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-codec</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<version>${spring-cloud-stream.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>${spring-cloud-aws.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>7.1</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<quiet>true</quiet>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build-tools</artifactId>
<version>1.3.1.RELEASE</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>checkstyle-validation</id>
<phase>validate</phase>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>false</failsOnError>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis-parent</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-starter-stream-kinesis</artifactId>
<description>Spring Cloud Starter Stream Kinesis</description>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis-parent</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-stream-binder-kinesis-core</artifactId>
<description>Spring Cloud Stream Kinesis Binder Core</description>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,29 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kinesis.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
*
* @author Peter Oates
*
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.kinesis.binder")
public class KinesisBinderConfigurationProperties {
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kinesis.properties;
/**
* @author Peter Oates
*/
public class KinesisBindingProperties {
private KinesisConsumerProperties consumer = new KinesisConsumerProperties();
private KinesisProducerProperties producer = new KinesisProducerProperties();
public KinesisConsumerProperties getConsumer() {
return consumer;
}
public void setConsumer(KinesisConsumerProperties consumer) {
this.consumer = consumer;
}
public KinesisProducerProperties getProducer() {
return producer;
}
public void setProducer(KinesisProducerProperties producer) {
this.producer = producer;
}
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kinesis.properties;
/**
*
* @author Peter Oates
*
*/
public class KinesisConsumerProperties {
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kinesis.properties;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
/**
*
* @author Peter Oates
*
*/
@ConfigurationProperties("spring.cloud.stream.kinesis")
public class KinesisExtendedBindingProperties
implements ExtendedBindingProperties<KinesisConsumerProperties, KinesisProducerProperties> {
private Map<String, KinesisBindingProperties> bindings = new HashMap<>();
public Map<String, KinesisBindingProperties> getBindings() {
return this.bindings;
}
public void setBindings(Map<String, KinesisBindingProperties> bindings) {
this.bindings = bindings;
}
@Override
public KinesisConsumerProperties getExtendedConsumerProperties(String channelName) {
if (this.bindings.containsKey(channelName) && this.bindings.get(channelName).getConsumer() != null) {
return this.bindings.get(channelName).getConsumer();
}
else {
return new KinesisConsumerProperties();
}
}
@Override
public KinesisProducerProperties getExtendedProducerProperties(String channelName) {
if (this.bindings.containsKey(channelName) && this.bindings.get(channelName).getProducer() != null) {
return this.bindings.get(channelName).getProducer();
}
else {
return new KinesisProducerProperties();
}
}
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kinesis.properties;
/**
*
* @author Peter Oates
*
*/
public class KinesisProducerProperties {
}

View File

@@ -0,0 +1,154 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kinesis.provisioning;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.Assert;
import com.amazonaws.services.kinesis.AmazonKinesis;
/**
*
* @author Peter Oates
* @author Artem Bilan
*
*/
public class KinesisStreamProvisioner
implements ProvisioningProvider<ExtendedConsumerProperties<KinesisConsumerProperties>,
ExtendedProducerProperties<KinesisProducerProperties>>, InitializingBean {
private final AmazonKinesis amazonKinesis;
private final KinesisBinderConfigurationProperties configurationProperties;
public KinesisStreamProvisioner(AmazonKinesis amazonKinesis,
KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties) {
Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null");
Assert.notNull(kinesisBinderConfigurationProperties, "'kinesisBinderConfigurationProperties' must not be null");
this.amazonKinesis = amazonKinesis;
this.configurationProperties = kinesisBinderConfigurationProperties;
}
@Override
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub
}
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<KinesisProducerProperties> properties) throws ProvisioningException {
KinesisProducerDestination producer = new KinesisProducerDestination(name);
return producer;
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<KinesisConsumerProperties> properties) throws ProvisioningException {
KinesisConsumerDestination consumer = new KinesisConsumerDestination(name);
return consumer;
}
private static final class KinesisProducerDestination implements ProducerDestination {
private final String streamName;
private final int shards;
KinesisProducerDestination(String streamName) {
this(streamName, 0);
}
KinesisProducerDestination(String streamName, Integer shards) {
this.streamName = streamName;
this.shards = shards;
}
@Override
public String getName() {
return this.streamName;
}
@Override
public String getNameForPartition(int shard) {
return this.streamName;
}
@Override
public String toString() {
return "KinesisProducerDestination{" +
"streamName='" + this.streamName + '\'' +
", shards=" + this.shards +
'}';
}
}
private static final class KinesisConsumerDestination implements ConsumerDestination {
private final String streamName;
private final int shards;
private final String dlqName;
KinesisConsumerDestination(String streamName) {
this(streamName, 0, null);
}
KinesisConsumerDestination(String streamName, int shards) {
this(streamName, shards, null);
}
KinesisConsumerDestination(String streamName, Integer shards, String dlqName) {
this.streamName = streamName;
this.shards = shards;
this.dlqName = dlqName;
}
@Override
public String getName() {
return this.streamName;
}
@Override
public String toString() {
return "KinesisConsumerDestination{" +
"streamName='" + streamName + '\'' +
", shards=" + shards +
", dlqName='" + dlqName + '\'' +
'}';
}
}
}

View File

@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis-parent</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-kinesis</name>
<description>AWS Kinesis Binder Implementation</description>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-codec</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-aws</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,118 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kinesis;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.aws.inbound.kinesis.CheckpointMode;
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter;
import org.springframework.integration.aws.inbound.kinesis.KinesisShardOffset;
import org.springframework.integration.aws.inbound.kinesis.ListenerMode;
import org.springframework.integration.aws.outbound.KinesisMessageHandler;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
/**
*
* @author Peter Oates
* @author Artem Bilan
*/
public class KinesisMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KinesisConsumerProperties>, ExtendedProducerProperties<KinesisProducerProperties>, KinesisStreamProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KinesisConsumerProperties, KinesisProducerProperties> {
// not using currently but expect to include global properties here if
// required
private final KinesisBinderConfigurationProperties configurationProperties;
// not currently utilising any extended properties - not sure if I will need
// these
private KinesisExtendedBindingProperties extendedBindingProperties = new KinesisExtendedBindingProperties();
private final AmazonKinesisAsync amazonKinesis;
public KinesisMessageChannelBinder(AmazonKinesisAsync amazonKinesis,
KinesisBinderConfigurationProperties configurationProperties,
KinesisStreamProvisioner provisioningProvider) {
super(false, null, provisioningProvider);
Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null");
Assert.notNull(configurationProperties, "'configurationProperties' must not be null");
this.configurationProperties = configurationProperties;
this.amazonKinesis = amazonKinesis;
}
@Override
public KinesisConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public KinesisProducerProperties getExtendedProducerProperties(String channelName) {
return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
}
// below are the main methods to implement - these will create the message
// handlers used by the application
// to put and consume messages
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<KinesisProducerProperties> producerProperties) throws Exception {
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(this.amazonKinesis);
kinesisMessageHandler.setStream(destination.getName());
// tidy up partition key
kinesisMessageHandler.setPartitionKey("name");
kinesisMessageHandler.setBeanFactory(this.getBeanFactory());
return kinesisMessageHandler;
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<KinesisConsumerProperties> properties) throws Exception {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, destination.getName());
// explicitly setting the offset - LATEST is the default but added here
// so can be configured later
adapter.setStreamInitialSequence(KinesisShardOffset.latest());
// need to move these properties to the appropriate properties class
adapter.setCheckpointMode(CheckpointMode.record);
adapter.setListenerMode(ListenerMode.record);
adapter.setStartTimeout(10000);
adapter.setDescribeStreamRetries(1);
adapter.setConcurrency(10);
return adapter;
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kinesis.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.aws.core.region.RegionProvider;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kinesis.KinesisMessageChannelBinder;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.codec.Codec;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
/**
*
* @author Peter Oates
* @author Artem Bilan
*
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@EnableConfigurationProperties({ KinesisBinderConfigurationProperties.class, KinesisExtendedBindingProperties.class })
public class KinesisBinderConfiguration {
@Autowired
private KinesisBinderConfigurationProperties configurationProperties;
@Bean
public AmazonKinesisAsync amazonKinesis(
AWSCredentialsProvider awsCredentialsProvider,
RegionProvider regionProvider) {
return AmazonKinesisAsyncClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.withRegion(regionProvider.getRegion().getName())
.build();
}
@Bean
public KinesisStreamProvisioner provisioningProvider(AmazonKinesisAsync amazonKinesis) {
return new KinesisStreamProvisioner(amazonKinesis, this.configurationProperties);
}
@Bean
KinesisMessageChannelBinder kinesisMessageChannelBinder(
AmazonKinesisAsync amazonKinesis,
KinesisStreamProvisioner provisioningProvider,
Codec codec) {
KinesisMessageChannelBinder kinesisMessageChannelBinder =
new KinesisMessageChannelBinder(amazonKinesis, this.configurationProperties, provisioningProvider);
kinesisMessageChannelBinder.setCodec(codec);
return kinesisMessageChannelBinder;
}
}

View File

@@ -0,0 +1 @@
kinesis: org.springframework.cloud.stream.binder.kinesis.config.KinesisBinderConfiguration