INTEXT-64 MQTT Adapters
See README.md for more information. http://jira.springsource.org/browse/INTEXT-64
This commit is contained in:
committed by
Gunnar Hillert
parent
b139943b46
commit
745dd018ca
76
spring-integration-mqtt/README.md
Normal file
76
spring-integration-mqtt/README.md
Normal file
@@ -0,0 +1,76 @@
|
||||
Spring Integration Mqtt Adapters
|
||||
=================================================
|
||||
|
||||
`inbound` and `outbound` channel adapters are provided for Mqtt. The current implementation uses the [Eclipse Paho][] client.
|
||||
|
||||
Example configurations...
|
||||
|
||||
<int-mqtt:message-driven-channel-adapter id="twoTopicsAdapter"
|
||||
client-id="foo"
|
||||
url="tcp://localhost:1883"
|
||||
topics="bar, baz"
|
||||
channel="out" />
|
||||
|
||||
<int-mqtt:outbound-channel-adapter id="withDefaultConverter"
|
||||
client-id="foo"
|
||||
url="tcp://localhost:1883"
|
||||
default-qos="1"
|
||||
default-retained="true"
|
||||
default-topic="bar"
|
||||
channel="target" />
|
||||
|
||||
|
||||
Spring integration messages sent to the outbound adapter can have headers `mqtt_topic, mqtt_qos, mqtt_retained` which will override the defaults configured on the adapter.
|
||||
|
||||
Inbound messages will have headers
|
||||
|
||||
mqtt_topic - the topic from which the message was received
|
||||
mqtt_duplicate - true if the message is a duplicate
|
||||
mqtt_qos - the quality of service
|
||||
|
||||
|
||||
|
||||
Both adapters use a `MqttPahoClientFactory` to get a client instance; the same factory also provides connection options from configured properties (such as user/password). The client factory bean (`DefaultMqttPahoClientFactory`) is provided to the adapter using the `client-factory` attribute. When not provided, a default factory instance is used.
|
||||
|
||||
|
||||
Currently tested with the RabbitMQ MQTT plugin.
|
||||
|
||||
|
||||
##Note:
|
||||
|
||||
Currently, the Paho java client is not mavenized; there is an [open paho bug][] to resolve this. In the meantime, you can manually add the jar to your maven repo:
|
||||
|
||||
mvn install:install-file -DgroupId=org.eclipse.paho -DartifactId=MQTT-Java -Dversion=3.0 -Dpackaging=jar -Dfile=/path/to/org.eclipse.paho.client.mqttv3.jar
|
||||
|
||||
|
||||
|
||||
Check out the [Spring Integration forums][] and the [spring-integration][spring-integration tag] tag
|
||||
on [Stack Overflow][]. [Commercial support][] is available, too.
|
||||
|
||||
## Related GitHub projects
|
||||
|
||||
* [Spring Integration][]
|
||||
* [Spring Integration Samples][]
|
||||
* [Spring Integration Templates][]
|
||||
* [Spring Integration Dsl Groovy][]
|
||||
* [Spring Integration Dsl Scala][]
|
||||
* [Spring Integration Pattern Catalog][]
|
||||
|
||||
For more information, please also don't forget to visit the [Spring Integration][] website.
|
||||
|
||||
## Eclipse Paho
|
||||
|
||||
* [Eclipse Paho][]
|
||||
|
||||
[Spring Integration]: https://github.com/SpringSource/spring-integration
|
||||
[Commercial support]: http://springsource.com/support/springsupport
|
||||
[Spring Integration forums]: http://forum.springsource.org/forumdisplay.php?42-Integration
|
||||
[spring-integration tag]: http://stackoverflow.com/questions/tagged/spring-integration
|
||||
[Spring Integration Samples]: https://github.com/SpringSource/spring-integration-samples
|
||||
[Spring Integration Templates]: https://github.com/SpringSource/spring-integration-templates/tree/master/si-sts-templates
|
||||
[Spring Integration Dsl Groovy]: https://github.com/SpringSource/spring-integration-dsl-groovy
|
||||
[Spring Integration Dsl Scala]: https://github.com/SpringSource/spring-integration-dsl-scala
|
||||
[Spring Integration Pattern Catalog]: https://github.com/SpringSource/spring-integration-pattern-catalog
|
||||
[Stack Overflow]: http://stackoverflow.com/faq
|
||||
[Eclipse Paho]: http://www.eclipse.org/paho/
|
||||
[open paho bug]: https://bugs.eclipse.org/bugs/show_bug.cgi?id=382471
|
||||
266
spring-integration-mqtt/build.gradle
Normal file
266
spring-integration-mqtt/build.gradle
Normal file
@@ -0,0 +1,266 @@
|
||||
description = 'Spring Integration Mqtt Adapter'
|
||||
|
||||
buildscript {
|
||||
repositories {
|
||||
maven { url 'https://repo.springsource.org/plugins-snapshot' }
|
||||
}
|
||||
dependencies {
|
||||
classpath 'org.springframework.build.gradle:docbook-reference-plugin:0.1.5'
|
||||
}
|
||||
}
|
||||
|
||||
apply plugin: 'java'
|
||||
apply from: "${rootProject.projectDir}/publish-maven.gradle"
|
||||
apply plugin: 'eclipse'
|
||||
apply plugin: 'idea'
|
||||
|
||||
group = 'org.springframework.integration.mqtt'
|
||||
|
||||
repositories {
|
||||
maven { url 'http://repo.springsource.org/libs-snapshot' }
|
||||
maven { url 'http://repo.springsource.org/plugins-release' }
|
||||
mavenLocal()
|
||||
}
|
||||
|
||||
sourceCompatibility=1.6
|
||||
targetCompatibility=1.6
|
||||
|
||||
ext {
|
||||
junitVersion = '4.10'
|
||||
log4jVersion = '1.2.12'
|
||||
mockitoVersion = '1.9.0'
|
||||
springVersion = '3.1.3.RELEASE'
|
||||
springIntegrationVersion = '3.0.0.BUILD-SNAPSHOT'
|
||||
|
||||
idPrefix = 'mqttadapter'
|
||||
}
|
||||
|
||||
eclipse {
|
||||
project {
|
||||
natures += 'org.springframework.ide.eclipse.core.springnature'
|
||||
}
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
test {
|
||||
resources {
|
||||
srcDirs = ['src/test/resources', 'src/test/java']
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// See http://www.gradle.org/docs/current/userguide/dependency_management.html#sub:configurations
|
||||
// and http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ConfigurationContainer.html
|
||||
configurations {
|
||||
jacoco //Configuration Group used by Sonar to provide Code Coverage using JaCoCo
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion"
|
||||
compile "org.eclipse.paho:MQTT-Java:3.0"
|
||||
testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion"
|
||||
testCompile "junit:junit-dep:$junitVersion"
|
||||
testCompile "log4j:log4j:$log4jVersion"
|
||||
testCompile "org.mockito:mockito-all:$mockitoVersion"
|
||||
testCompile "org.springframework:spring-test:$springVersion"
|
||||
jacoco group: "org.jacoco", name: "org.jacoco.agent", version: "0.5.6.201201232323", classifier: "runtime"
|
||||
}
|
||||
|
||||
|
||||
// enable all compiler warnings; individual projects may customize further
|
||||
ext.xLintArg = '-Xlint:all'
|
||||
[compileJava, compileTestJava]*.options*.compilerArgs = [xLintArg]
|
||||
|
||||
test {
|
||||
// suppress all console output during testing unless running `gradle -i`
|
||||
logging.captureStandardOutput(LogLevel.INFO)
|
||||
jvmArgs "-javaagent:${configurations.jacoco.asPath}=destfile=${buildDir}/jacoco.exec,includes=*"
|
||||
}
|
||||
|
||||
task sourcesJar(type: Jar) {
|
||||
classifier = 'sources'
|
||||
from sourceSets.main.allJava
|
||||
}
|
||||
|
||||
task javadocJar(type: Jar) {
|
||||
classifier = 'javadoc'
|
||||
from javadoc
|
||||
}
|
||||
|
||||
artifacts {
|
||||
archives sourcesJar
|
||||
archives javadocJar
|
||||
}
|
||||
|
||||
apply plugin: 'docbook-reference'
|
||||
|
||||
reference {
|
||||
sourceDir = file('src/reference/docbook')
|
||||
}
|
||||
|
||||
apply plugin: 'sonar'
|
||||
|
||||
sonar {
|
||||
|
||||
if (rootProject.hasProperty('sonarHostUrl')) {
|
||||
server.url = rootProject.sonarHostUrl
|
||||
}
|
||||
|
||||
database {
|
||||
if (rootProject.hasProperty('sonarJdbcUrl')) {
|
||||
url = rootProject.sonarJdbcUrl
|
||||
}
|
||||
if (rootProject.hasProperty('sonarJdbcDriver')) {
|
||||
driverClassName = rootProject.sonarJdbcDriver
|
||||
}
|
||||
if (rootProject.hasProperty('sonarJdbcUsername')) {
|
||||
username = rootProject.sonarJdbcUsername
|
||||
}
|
||||
if (rootProject.hasProperty('sonarJdbcPassword')) {
|
||||
password = rootProject.sonarJdbcPassword
|
||||
}
|
||||
}
|
||||
|
||||
project {
|
||||
dynamicAnalysis = "reuseReports"
|
||||
withProjectProperties { props ->
|
||||
props["sonar.core.codeCoveragePlugin"] = "jacoco"
|
||||
props["sonar.jacoco.reportPath"] = "${buildDir.name}/jacoco.exec"
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Sonar parameters used: server.url='${server.url}'; database.url='${database.url}'; database.driverClassName='${database.driverClassName}'; database.username='${database.username}'")
|
||||
}
|
||||
|
||||
task api(type: Javadoc) {
|
||||
group = 'Documentation'
|
||||
description = 'Generates the Javadoc API documentation.'
|
||||
title = "${rootProject.description} ${version} API"
|
||||
options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED
|
||||
options.author = true
|
||||
options.header = rootProject.description
|
||||
options.overview = 'src/api/overview.html'
|
||||
|
||||
source = sourceSets.main.allJava
|
||||
classpath = project.sourceSets.main.compileClasspath
|
||||
destinationDir = new File(buildDir, "api")
|
||||
}
|
||||
|
||||
task schemaZip(type: Zip) {
|
||||
group = 'Distribution'
|
||||
classifier = 'schema'
|
||||
description = "Builds -${classifier} archive containing all " +
|
||||
"XSDs for deployment at static.springframework.org/schema."
|
||||
|
||||
def Properties schemas = new Properties();
|
||||
def shortName = idPrefix.replaceFirst("${idPrefix}-", '')
|
||||
|
||||
project.sourceSets.main.resources.find {
|
||||
it.path.endsWith('META-INF/spring.schemas')
|
||||
}?.withInputStream { schemas.load(it) }
|
||||
|
||||
for (def key : schemas.keySet()) {
|
||||
File xsdFile = project.sourceSets.main.resources.find {
|
||||
it.path.endsWith(schemas.get(key))
|
||||
}
|
||||
assert xsdFile != null
|
||||
into ("integration/${shortName}") {
|
||||
from xsdFile.path
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
task docsZip(type: Zip) {
|
||||
group = 'Distribution'
|
||||
classifier = 'docs'
|
||||
description = "Builds -${classifier} archive containing api and reference " +
|
||||
"for deployment at static.springframework.org/spring-integration/docs."
|
||||
|
||||
from('src/dist') {
|
||||
include 'changelog.txt'
|
||||
}
|
||||
|
||||
from (api) {
|
||||
into 'api'
|
||||
}
|
||||
|
||||
from (reference) {
|
||||
into 'reference'
|
||||
}
|
||||
}
|
||||
|
||||
task distZip(type: Zip, dependsOn: [docsZip, schemaZip]) {
|
||||
group = 'Distribution'
|
||||
classifier = 'dist'
|
||||
description = "Builds -${classifier} archive, containing all jars and docs, " +
|
||||
"suitable for community download page."
|
||||
|
||||
ext.baseDir = "${project.name}-${project.version}";
|
||||
|
||||
from('src/dist') {
|
||||
include 'readme.txt'
|
||||
include 'license.txt'
|
||||
include 'notice.txt'
|
||||
into "${baseDir}"
|
||||
}
|
||||
|
||||
from(zipTree(docsZip.archivePath)) {
|
||||
into "${baseDir}/docs"
|
||||
}
|
||||
|
||||
from(zipTree(schemaZip.archivePath)) {
|
||||
into "${baseDir}/schema"
|
||||
}
|
||||
|
||||
into ("${baseDir}/libs") {
|
||||
from project.jar
|
||||
from project.sourcesJar
|
||||
from project.javadocJar
|
||||
}
|
||||
}
|
||||
|
||||
// Create an optional "with dependencies" distribution.
|
||||
// Not published by default; only for use when building from source.
|
||||
task depsZip(type: Zip, dependsOn: distZip) { zipTask ->
|
||||
group = 'Distribution'
|
||||
classifier = 'dist-with-deps'
|
||||
description = "Builds -${classifier} archive, containing everything " +
|
||||
"in the -${distZip.classifier} archive plus all dependencies."
|
||||
|
||||
from zipTree(distZip.archivePath)
|
||||
|
||||
gradle.taskGraph.whenReady { taskGraph ->
|
||||
if (taskGraph.hasTask(":${zipTask.name}")) {
|
||||
def projectName = rootProject.name
|
||||
def artifacts = new HashSet()
|
||||
|
||||
rootProject.configurations.runtime.resolvedConfiguration.resolvedArtifacts.each { artifact ->
|
||||
def dependency = artifact.moduleVersion.id
|
||||
if (!projectName.equals(dependency.name)) {
|
||||
artifacts << artifact.file
|
||||
}
|
||||
}
|
||||
|
||||
zipTask.from(artifacts) {
|
||||
into "${distZip.baseDir}/deps"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
artifacts {
|
||||
archives distZip
|
||||
archives docsZip
|
||||
archives schemaZip
|
||||
}
|
||||
|
||||
task dist(dependsOn: assemble) {
|
||||
group = 'Distribution'
|
||||
description = 'Builds -dist, -docs and -schema distribution archives.'
|
||||
}
|
||||
|
||||
task wrapper(type: Wrapper) {
|
||||
description = 'Generates gradlew[.bat] scripts'
|
||||
gradleVersion = '1.3'
|
||||
}
|
||||
1
spring-integration-mqtt/gradle.properties
Normal file
1
spring-integration-mqtt/gradle.properties
Normal file
@@ -0,0 +1 @@
|
||||
version=1.0.0.BUILD-SNAPSHOT
|
||||
BIN
spring-integration-mqtt/gradle/wrapper/gradle-wrapper.jar
vendored
Normal file
BIN
spring-integration-mqtt/gradle/wrapper/gradle-wrapper.jar
vendored
Normal file
Binary file not shown.
6
spring-integration-mqtt/gradle/wrapper/gradle-wrapper.properties
vendored
Normal file
6
spring-integration-mqtt/gradle/wrapper/gradle-wrapper.properties
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
#Wed Sep 05 13:34:36 EDT 2012
|
||||
distributionBase=GRADLE_USER_HOME
|
||||
distributionPath=wrapper/dists
|
||||
zipStoreBase=GRADLE_USER_HOME
|
||||
zipStorePath=wrapper/dists
|
||||
distributionUrl=http\://services.gradle.org/distributions/gradle-1.1-bin.zip
|
||||
164
spring-integration-mqtt/gradlew
vendored
Executable file
164
spring-integration-mqtt/gradlew
vendored
Executable file
@@ -0,0 +1,164 @@
|
||||
#!/bin/bash
|
||||
|
||||
##############################################################################
|
||||
##
|
||||
## Gradle start up script for UN*X
|
||||
##
|
||||
##############################################################################
|
||||
|
||||
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
|
||||
DEFAULT_JVM_OPTS=""
|
||||
|
||||
APP_NAME="Gradle"
|
||||
APP_BASE_NAME=`basename "$0"`
|
||||
|
||||
# Use the maximum available, or set MAX_FD != -1 to use that value.
|
||||
MAX_FD="maximum"
|
||||
|
||||
warn ( ) {
|
||||
echo "$*"
|
||||
}
|
||||
|
||||
die ( ) {
|
||||
echo
|
||||
echo "$*"
|
||||
echo
|
||||
exit 1
|
||||
}
|
||||
|
||||
# OS specific support (must be 'true' or 'false').
|
||||
cygwin=false
|
||||
msys=false
|
||||
darwin=false
|
||||
case "`uname`" in
|
||||
CYGWIN* )
|
||||
cygwin=true
|
||||
;;
|
||||
Darwin* )
|
||||
darwin=true
|
||||
;;
|
||||
MINGW* )
|
||||
msys=true
|
||||
;;
|
||||
esac
|
||||
|
||||
# For Cygwin, ensure paths are in UNIX format before anything is touched.
|
||||
if $cygwin ; then
|
||||
[ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
|
||||
fi
|
||||
|
||||
# Attempt to set APP_HOME
|
||||
# Resolve links: $0 may be a link
|
||||
PRG="$0"
|
||||
# Need this for relative symlinks.
|
||||
while [ -h "$PRG" ] ; do
|
||||
ls=`ls -ld "$PRG"`
|
||||
link=`expr "$ls" : '.*-> \(.*\)$'`
|
||||
if expr "$link" : '/.*' > /dev/null; then
|
||||
PRG="$link"
|
||||
else
|
||||
PRG=`dirname "$PRG"`"/$link"
|
||||
fi
|
||||
done
|
||||
SAVED="`pwd`"
|
||||
cd "`dirname \"$PRG\"`/"
|
||||
APP_HOME="`pwd -P`"
|
||||
cd "$SAVED"
|
||||
|
||||
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
|
||||
|
||||
# Determine the Java command to use to start the JVM.
|
||||
if [ -n "$JAVA_HOME" ] ; then
|
||||
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
|
||||
# IBM's JDK on AIX uses strange locations for the executables
|
||||
JAVACMD="$JAVA_HOME/jre/sh/java"
|
||||
else
|
||||
JAVACMD="$JAVA_HOME/bin/java"
|
||||
fi
|
||||
if [ ! -x "$JAVACMD" ] ; then
|
||||
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
|
||||
|
||||
Please set the JAVA_HOME variable in your environment to match the
|
||||
location of your Java installation."
|
||||
fi
|
||||
else
|
||||
JAVACMD="java"
|
||||
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
|
||||
|
||||
Please set the JAVA_HOME variable in your environment to match the
|
||||
location of your Java installation."
|
||||
fi
|
||||
|
||||
# Increase the maximum file descriptors if we can.
|
||||
if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
|
||||
MAX_FD_LIMIT=`ulimit -H -n`
|
||||
if [ $? -eq 0 ] ; then
|
||||
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
|
||||
MAX_FD="$MAX_FD_LIMIT"
|
||||
fi
|
||||
ulimit -n $MAX_FD
|
||||
if [ $? -ne 0 ] ; then
|
||||
warn "Could not set maximum file descriptor limit: $MAX_FD"
|
||||
fi
|
||||
else
|
||||
warn "Could not query businessSystem maximum file descriptor limit: $MAX_FD_LIMIT"
|
||||
fi
|
||||
fi
|
||||
|
||||
# For Darwin, add options to specify how the application appears in the dock
|
||||
if $darwin; then
|
||||
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
|
||||
fi
|
||||
|
||||
# For Cygwin, switch paths to Windows format before running java
|
||||
if $cygwin ; then
|
||||
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
|
||||
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
|
||||
|
||||
# We build the pattern for arguments to be converted via cygpath
|
||||
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
|
||||
SEP=""
|
||||
for dir in $ROOTDIRSRAW ; do
|
||||
ROOTDIRS="$ROOTDIRS$SEP$dir"
|
||||
SEP="|"
|
||||
done
|
||||
OURCYGPATTERN="(^($ROOTDIRS))"
|
||||
# Add a user-defined pattern to the cygpath arguments
|
||||
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
|
||||
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
|
||||
fi
|
||||
# Now convert the arguments - kludge to limit ourselves to /bin/sh
|
||||
i=0
|
||||
for arg in "$@" ; do
|
||||
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
|
||||
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
|
||||
|
||||
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
|
||||
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
|
||||
else
|
||||
eval `echo args$i`="\"$arg\""
|
||||
fi
|
||||
i=$((i+1))
|
||||
done
|
||||
case $i in
|
||||
(0) set -- ;;
|
||||
(1) set -- "$args0" ;;
|
||||
(2) set -- "$args0" "$args1" ;;
|
||||
(3) set -- "$args0" "$args1" "$args2" ;;
|
||||
(4) set -- "$args0" "$args1" "$args2" "$args3" ;;
|
||||
(5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
|
||||
(6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
|
||||
(7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
|
||||
(8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
|
||||
(9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
|
||||
esac
|
||||
fi
|
||||
|
||||
# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
|
||||
function splitJvmOpts() {
|
||||
JVM_OPTS=("$@")
|
||||
}
|
||||
eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
|
||||
JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
|
||||
|
||||
exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
|
||||
90
spring-integration-mqtt/gradlew.bat
vendored
Normal file
90
spring-integration-mqtt/gradlew.bat
vendored
Normal file
@@ -0,0 +1,90 @@
|
||||
@if "%DEBUG%" == "" @echo off
|
||||
@rem ##########################################################################
|
||||
@rem
|
||||
@rem Gradle startup script for Windows
|
||||
@rem
|
||||
@rem ##########################################################################
|
||||
|
||||
@rem Set local scope for the variables with windows NT shell
|
||||
if "%OS%"=="Windows_NT" setlocal
|
||||
|
||||
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
|
||||
set DEFAULT_JVM_OPTS=
|
||||
|
||||
set DIRNAME=%~dp0
|
||||
if "%DIRNAME%" == "" set DIRNAME=.
|
||||
set APP_BASE_NAME=%~n0
|
||||
set APP_HOME=%DIRNAME%
|
||||
|
||||
@rem Find java.exe
|
||||
if defined JAVA_HOME goto findJavaFromJavaHome
|
||||
|
||||
set JAVA_EXE=java.exe
|
||||
%JAVA_EXE% -version >NUL 2>&1
|
||||
if "%ERRORLEVEL%" == "0" goto init
|
||||
|
||||
echo.
|
||||
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
|
||||
echo.
|
||||
echo Please set the JAVA_HOME variable in your environment to match the
|
||||
echo location of your Java installation.
|
||||
|
||||
goto fail
|
||||
|
||||
:findJavaFromJavaHome
|
||||
set JAVA_HOME=%JAVA_HOME:"=%
|
||||
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
|
||||
|
||||
if exist "%JAVA_EXE%" goto init
|
||||
|
||||
echo.
|
||||
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
|
||||
echo.
|
||||
echo Please set the JAVA_HOME variable in your environment to match the
|
||||
echo location of your Java installation.
|
||||
|
||||
goto fail
|
||||
|
||||
:init
|
||||
@rem Get command-line arguments, handling Windowz variants
|
||||
|
||||
if not "%OS%" == "Windows_NT" goto win9xME_args
|
||||
if "%@eval[2+2]" == "4" goto 4NT_args
|
||||
|
||||
:win9xME_args
|
||||
@rem Slurp the command line arguments.
|
||||
set CMD_LINE_ARGS=
|
||||
set _SKIP=2
|
||||
|
||||
:win9xME_args_slurp
|
||||
if "x%~1" == "x" goto execute
|
||||
|
||||
set CMD_LINE_ARGS=%*
|
||||
goto execute
|
||||
|
||||
:4NT_args
|
||||
@rem Get arguments from the 4NT Shell from JP Software
|
||||
set CMD_LINE_ARGS=%$
|
||||
|
||||
:execute
|
||||
@rem Setup the command line
|
||||
|
||||
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
|
||||
|
||||
@rem Execute Gradle
|
||||
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
|
||||
|
||||
:end
|
||||
@rem End local scope for the variables with windows NT shell
|
||||
if "%ERRORLEVEL%"=="0" goto mainEnd
|
||||
|
||||
:fail
|
||||
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
|
||||
rem the _cmd.exe /c_ return code!
|
||||
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
|
||||
exit /b 1
|
||||
|
||||
:mainEnd
|
||||
if "%OS%"=="Windows_NT" endlocal
|
||||
|
||||
:omega
|
||||
61
spring-integration-mqtt/publish-maven.gradle
Normal file
61
spring-integration-mqtt/publish-maven.gradle
Normal file
@@ -0,0 +1,61 @@
|
||||
apply plugin: 'maven'
|
||||
|
||||
ext.optionalDeps = []
|
||||
ext.providedDeps = []
|
||||
|
||||
ext.optional = { optionalDeps << it }
|
||||
ext.provided = { providedDeps << it }
|
||||
|
||||
install {
|
||||
repositories.mavenInstaller {
|
||||
customizePom(pom, project)
|
||||
}
|
||||
}
|
||||
|
||||
def customizePom(pom, gradleProject) {
|
||||
pom.whenConfigured { generatedPom ->
|
||||
// respect 'optional' and 'provided' dependencies
|
||||
gradleProject.optionalDeps.each { dep ->
|
||||
generatedPom.dependencies.find { it.artifactId == dep.name }?.optional = true
|
||||
}
|
||||
gradleProject.providedDeps.each { dep ->
|
||||
generatedPom.dependencies.find { it.artifactId == dep.name }?.scope = 'provided'
|
||||
}
|
||||
|
||||
// eliminate test-scoped dependencies (no need in maven central poms)
|
||||
generatedPom.dependencies.removeAll { dep ->
|
||||
dep.scope == 'test'
|
||||
}
|
||||
|
||||
// add all items necessary for maven central publication
|
||||
generatedPom.project {
|
||||
name = gradleProject.description
|
||||
description = gradleProject.description
|
||||
url = 'https://github.com/SpringSource/spring-integration-extensions'
|
||||
organization {
|
||||
name = 'SpringSource'
|
||||
url = 'http://springsource.org'
|
||||
}
|
||||
licenses {
|
||||
license {
|
||||
name 'The Apache Software License, Version 2.0'
|
||||
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
|
||||
distribution 'repo'
|
||||
}
|
||||
}
|
||||
scm {
|
||||
url = 'https://github.com/SpringSource/spring-integration-extensions'
|
||||
connection = 'scm:git:git://github.com/SpringSource/spring-integration-extensions'
|
||||
developerConnection = 'scm:git:git://github.com/SpringSource/spring-integration-extensions'
|
||||
}
|
||||
|
||||
developers {
|
||||
developer {
|
||||
id = 'not specified'
|
||||
name = 'Gary Russell'
|
||||
email = 'not specified'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
22
spring-integration-mqtt/src/api/overview.html
Normal file
22
spring-integration-mqtt/src/api/overview.html
Normal file
@@ -0,0 +1,22 @@
|
||||
<html>
|
||||
<body>
|
||||
This document is the API specification for Spring Integration
|
||||
<hr/>
|
||||
<div id="overviewBody">
|
||||
<p>
|
||||
For further API reference and developer documentation, see the
|
||||
<a href="http://static.springsource.org/spring-integration/reference" target="_top">Spring
|
||||
Integration reference documentation</a>.
|
||||
That documentation contains more detailed, developer-targeted
|
||||
descriptions, with conceptual overviews, definitions of terms,
|
||||
workarounds, and working code examples.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
If you are interested in commercial training, consultancy, and
|
||||
support for Spring Integration, please visit <a href="http://www.springsource.com" target="_top">
|
||||
http://www.springsource.com</a>
|
||||
</p>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
15
spring-integration-mqtt/src/dist/changelog.txt
vendored
Normal file
15
spring-integration-mqtt/src/dist/changelog.txt
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
Spring Integration MqttAdapter Adapter CHANGELOG
|
||||
=========================================
|
||||
|
||||
For the full detailed changelog, see:
|
||||
https://....
|
||||
|
||||
|
||||
Changes in version 1.0 GA (insert date here)
|
||||
https://....
|
||||
|
||||
|
||||
*** GENERAL ***
|
||||
|
||||
Upgraded Spring Framework dependency to ...
|
||||
...
|
||||
201
spring-integration-mqtt/src/dist/license.txt
vendored
Normal file
201
spring-integration-mqtt/src/dist/license.txt
vendored
Normal file
@@ -0,0 +1,201 @@
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
21
spring-integration-mqtt/src/dist/notice.txt
vendored
Normal file
21
spring-integration-mqtt/src/dist/notice.txt
vendored
Normal file
@@ -0,0 +1,21 @@
|
||||
========================================================================
|
||||
== NOTICE file corresponding to section 4 d of the Apache License, ==
|
||||
== Version 2.0, in this case for the Spring Integration distribution. ==
|
||||
========================================================================
|
||||
|
||||
This product includes software developed by
|
||||
the Apache Software Foundation (http://www.apache.org).
|
||||
|
||||
The end-user documentation included with a redistribution, if any,
|
||||
must include the following acknowledgement:
|
||||
|
||||
"This product includes software developed by the Spring Framework
|
||||
Project (http://www.springframework.org)."
|
||||
|
||||
Alternatively, this acknowledgement may appear in the software itself,
|
||||
if and wherever such third-party acknowledgements normally appear.
|
||||
|
||||
The names "Spring", "Spring Framework", and "Spring Integration" must
|
||||
not be used to endorse or promote products derived from this software
|
||||
without prior written permission. For written permission, please contact
|
||||
enquiries@springsource.com.
|
||||
13
spring-integration-mqtt/src/dist/readme.txt
vendored
Normal file
13
spring-integration-mqtt/src/dist/readme.txt
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
Spring Integration Mqtt Adapters
|
||||
-----------------------------------
|
||||
|
||||
To find out what has changed since any earlier releases, see 'changelog.txt'.
|
||||
|
||||
Please consult the documentation located within the 'docs/reference' directory
|
||||
of this release and also visit the official Spring Integration home at
|
||||
http://www.springsource.org/spring-integration
|
||||
|
||||
There you will find links to the forum, issue tracker, and several other resources.
|
||||
|
||||
See https://github.com/SpringSource/spring-integration#readme for additional
|
||||
information including instructions on building from source.
|
||||
@@ -0,0 +1,48 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.config.xml;
|
||||
|
||||
import org.springframework.beans.factory.support.AbstractBeanDefinition;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
|
||||
import org.springframework.beans.factory.xml.ParserContext;
|
||||
import org.springframework.integration.config.xml.AbstractChannelAdapterParser;
|
||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
||||
import org.w3c.dom.Element;
|
||||
|
||||
/**
|
||||
* The MqttAdapter Message Driven Channel adapter parser
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public class MqttMessageDrivenChannelAdapterParser extends AbstractChannelAdapterParser {
|
||||
|
||||
|
||||
@Override
|
||||
protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
|
||||
|
||||
BeanDefinitionBuilder builder = BeanDefinitionBuilder
|
||||
.genericBeanDefinition(MqttPahoMessageDrivenChannelAdapter.class);
|
||||
|
||||
MqttParserUtils.parseCommon(element, builder);
|
||||
builder.addConstructorArgValue(element.getAttribute("topics"));
|
||||
builder.addPropertyReference("outputChannel", channelName);
|
||||
|
||||
return builder.getBeanDefinition();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.config.xml;
|
||||
|
||||
import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler;
|
||||
|
||||
/**
|
||||
* The namespace handler for the MqttAdapter namespace
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public class MqttNamespaceHandler extends AbstractIntegrationNamespaceHandler {
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.springframework.beans.factory.xml.NamespaceHandler#init()
|
||||
*/
|
||||
public void init() {
|
||||
this.registerBeanDefinitionParser("message-driven-channel-adapter", new MqttMessageDrivenChannelAdapterParser());
|
||||
this.registerBeanDefinitionParser("outbound-channel-adapter", new MqttOutboundChannelAdapterParser());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.config.xml;
|
||||
|
||||
import org.springframework.beans.factory.support.AbstractBeanDefinition;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
|
||||
import org.springframework.beans.factory.xml.ParserContext;
|
||||
import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser;
|
||||
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
|
||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.w3c.dom.Element;
|
||||
|
||||
/**
|
||||
* The parser for the MqttAdapter Outbound Channel Adapter.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public class MqttOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {
|
||||
|
||||
@Override
|
||||
protected boolean shouldGenerateId() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldGenerateIdAsFallback() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
|
||||
|
||||
final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MqttPahoMessageHandler.class);
|
||||
|
||||
MqttParserUtils.parseCommon(element, builder);
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "default-topic");
|
||||
if (StringUtils.hasText(element.getAttribute("converter")) &&
|
||||
(StringUtils.hasText(element.getAttribute("default-qos")) ||
|
||||
StringUtils.hasText(element.getAttribute("default-retained")))) {
|
||||
parserContext.getReaderContext().error("If a 'converter' is provided, you cannot provide " +
|
||||
"'default-qos' or 'default-retained'", element);
|
||||
}
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "default-qos");
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "default-retained");
|
||||
|
||||
return builder.getBeanDefinition();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.config.xml;
|
||||
|
||||
import org.springframework.beans.factory.config.BeanDefinition;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
|
||||
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.w3c.dom.Element;
|
||||
|
||||
/**
|
||||
* Contains various utility methods for parsing Mqtt Adapter
|
||||
* specific namesspace elements as well as for the generation of the the
|
||||
* respective {@link BeanDefinition}s.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public final class MqttParserUtils {
|
||||
|
||||
/** Prevent instantiation. */
|
||||
private MqttParserUtils() {
|
||||
throw new AssertionError();
|
||||
}
|
||||
|
||||
public static void parseCommon(Element element, BeanDefinitionBuilder builder) {
|
||||
builder.addConstructorArgValue(element.getAttribute("url"));
|
||||
builder.addConstructorArgValue(element.getAttribute("client-id"));
|
||||
String clientFactory = element.getAttribute("client-factory");
|
||||
if (StringUtils.hasText(clientFactory)) {
|
||||
builder.addConstructorArgReference(clientFactory);
|
||||
}
|
||||
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "converter");
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup");
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "phase");
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-timeout");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* Provides parser classes to provide Xml namespace support for the MqttAdapter components.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.config.xml;
|
||||
@@ -0,0 +1,159 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.core;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
|
||||
/**
|
||||
* Creates a default {@link MqttClient} and a set of options as configured.
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public class DefaultMqttPahoClientFactory implements MqttPahoClientFactory {
|
||||
|
||||
private volatile Boolean cleanSession;
|
||||
|
||||
private volatile Integer connectionTimeout;
|
||||
|
||||
private volatile Integer keepAliveInterval;
|
||||
|
||||
private volatile String password;
|
||||
|
||||
private volatile SocketFactory socketFactory;
|
||||
|
||||
private volatile Properties sslProperties;
|
||||
|
||||
private volatile String userName;
|
||||
|
||||
private volatile MqttClientPersistence persistence;
|
||||
|
||||
private volatile Will will;
|
||||
|
||||
public void setCleanSession(Boolean cleanSession) {
|
||||
this.cleanSession = cleanSession;
|
||||
}
|
||||
|
||||
public void setConnectionTimeout(Integer connectionTimeout) {
|
||||
this.connectionTimeout = connectionTimeout;
|
||||
}
|
||||
|
||||
public void setKeepAliveInterval(Integer keepAliveInterval) {
|
||||
this.keepAliveInterval = keepAliveInterval;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public void setSocketFactory(SocketFactory socketFactory) {
|
||||
this.socketFactory = socketFactory;
|
||||
}
|
||||
|
||||
public void setSslProperties(Properties sslProperties) {
|
||||
this.sslProperties = sslProperties;
|
||||
}
|
||||
|
||||
public void setUserName(String userName) {
|
||||
this.userName = userName;
|
||||
}
|
||||
|
||||
public void setWill(Will will) {
|
||||
this.will = will;
|
||||
}
|
||||
|
||||
public void setPersistence(MqttClientPersistence persistence) {
|
||||
this.persistence = persistence;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MqttClient getClientInstance(String url, String clientId) throws MqttException {
|
||||
return new MqttClient(url, clientId, this.persistence);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MqttConnectOptions getConnectionOptions() {
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
if (this.cleanSession != null) {
|
||||
options.setCleanSession(this.cleanSession);
|
||||
}
|
||||
if (this.connectionTimeout != null) {
|
||||
options.setConnectionTimeout(this.connectionTimeout);
|
||||
}
|
||||
if (this.keepAliveInterval != null) {
|
||||
options.setKeepAliveInterval(this.keepAliveInterval);
|
||||
}
|
||||
if (this.password != null) {
|
||||
options.setPassword(this.password.toCharArray());
|
||||
}
|
||||
if (this.socketFactory != null) {
|
||||
options.setSocketFactory(this.socketFactory);
|
||||
}
|
||||
if (this.sslProperties != null) {
|
||||
options.setSSLProperties(this.sslProperties);
|
||||
}
|
||||
if (this.userName != null) {
|
||||
options.setUserName(this.userName);
|
||||
}
|
||||
if (this.will != null) {
|
||||
options.setWill(this.will.getTopic(), this.will.getPayload(), this.will.getQos(), this.will.isRetained());
|
||||
}
|
||||
return options;
|
||||
}
|
||||
|
||||
public static class Will {
|
||||
|
||||
private final String topic;
|
||||
|
||||
private final byte[] payload;
|
||||
|
||||
private final int qos;
|
||||
|
||||
private final boolean retained;
|
||||
|
||||
public Will(String topic, byte[] payload, int qos, boolean retained) {
|
||||
this.topic = topic;
|
||||
this.payload = payload;
|
||||
this.qos = qos;
|
||||
this.retained = retained;
|
||||
}
|
||||
|
||||
protected String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
protected byte[] getPayload() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
protected int getQos() {
|
||||
return qos;
|
||||
}
|
||||
|
||||
protected boolean isRetained() {
|
||||
return retained;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.core;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public interface MqttPahoClientFactory {
|
||||
|
||||
MqttClient getClientInstance(String url, String clientId) throws MqttException;
|
||||
|
||||
MqttConnectOptions getConnectionOptions();
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* Provides core classes of the MqttAdapter module.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.core;
|
||||
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright 2002-2012 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.inbound;
|
||||
|
||||
import org.springframework.integration.endpoint.MessageProducerSupport;
|
||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
||||
import org.springframework.integration.mqtt.support.MqttMessageConverter;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Abstract class for MQTT Message-Driven Channel Adapters.
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport {
|
||||
|
||||
private final String url;
|
||||
|
||||
private final String clientId;
|
||||
|
||||
private final String[] topic;
|
||||
|
||||
private volatile MqttMessageConverter converter;
|
||||
|
||||
public AbstractMqttMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
|
||||
Assert.hasText(url, "'url' cannot be null or empty");
|
||||
Assert.hasText(clientId, "'clientId' cannot be null or empty");
|
||||
Assert.notNull(topic, "'topics' cannot be null");
|
||||
Assert.isTrue(topic.length > 0, "'topics' cannot be empty");
|
||||
Assert.noNullElements(topic, "'topics' cannot have null elements");
|
||||
this.url = url;
|
||||
this.clientId = clientId;
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public void setConverter(MqttMessageConverter converter) {
|
||||
Assert.notNull(converter, "'converter' cannot be null");
|
||||
this.converter = converter;
|
||||
}
|
||||
|
||||
protected String getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
protected String getClientId() {
|
||||
return clientId;
|
||||
}
|
||||
|
||||
protected MqttMessageConverter getConverter() {
|
||||
return converter;
|
||||
}
|
||||
|
||||
protected String[] getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onInit() {
|
||||
super.onInit();
|
||||
if (this.converter == null) {
|
||||
this.converter = new DefaultPahoMessageConverter();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getComponentType(){
|
||||
return "mqtt:inbound-channel-adapter";
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,156 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.inbound;
|
||||
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.springframework.integration.Message;
|
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||
|
||||
/**
|
||||
* Eclipse Paho Implementation.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter
|
||||
implements MqttCallback {
|
||||
|
||||
private final MqttPahoClientFactory clientFactory;
|
||||
|
||||
private volatile MqttClient client;
|
||||
|
||||
private volatile ScheduledFuture<?> reconnectFuture;
|
||||
|
||||
private volatile boolean connected;
|
||||
|
||||
|
||||
public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory,
|
||||
String... topic) {
|
||||
super(url, clientId, topic);
|
||||
this.clientFactory = clientFactory;
|
||||
}
|
||||
|
||||
public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
|
||||
this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
super.doStart();
|
||||
try {
|
||||
this.connectAndSubscribe();
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Exception while connecting and subscribing, retrying", e);
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
super.doStop();
|
||||
try {
|
||||
this.client.unsubscribe(this.getTopic());
|
||||
this.client.disconnect();
|
||||
this.client.close();
|
||||
this.connected = false;
|
||||
this.client = null;
|
||||
}
|
||||
catch (MqttException e) {
|
||||
logger.error("Exception while unsubscribing and disconnecting", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void connectAndSubscribe() throws MqttException {
|
||||
this.client = this.clientFactory.getClientInstance(this.getUrl(), this.getClientId());
|
||||
this.client.connect(this.clientFactory.getConnectionOptions());
|
||||
try {
|
||||
this.client.subscribe(this.getTopic());
|
||||
}
|
||||
catch (MqttException e) {
|
||||
this.client.disconnect();
|
||||
throw e;
|
||||
}
|
||||
if (this.client.isConnected()) {
|
||||
this.client.setCallback(this);
|
||||
this.connected = true;
|
||||
if (this.reconnectFuture != null) {
|
||||
this.cancelReconnect();
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Connected and subscribed to " + this.getTopic());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void cancelReconnect() {
|
||||
if (this.reconnectFuture != null) {
|
||||
this.reconnectFuture.cancel(false);
|
||||
this.reconnectFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleReconnect() {
|
||||
try {
|
||||
this.reconnectFuture = this.getTaskScheduler().scheduleWithFixedDelay(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Attempting reconnect");
|
||||
}
|
||||
if (!connected) {
|
||||
connectAndSubscribe();
|
||||
}
|
||||
}
|
||||
catch (MqttException e) {
|
||||
logger.error("Exception while connecting and subscribing", e);
|
||||
}
|
||||
}
|
||||
}, 10000);
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Failed to schedule reconnect", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionLost(Throwable cause) {
|
||||
this.logger.error("Lost connection:" + cause.getMessage() + "; retrying...");
|
||||
this.connected = false;
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
|
||||
Message<?> message = this.getConverter().toMessage(topic, mqttMessage);
|
||||
this.sendMessage(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* Provides inbound Spring Integration MqttAdapter components.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.inbound;
|
||||
@@ -0,0 +1,158 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.mqtt.outbound;
|
||||
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.integration.Message;
|
||||
import org.springframework.integration.MessageHandlingException;
|
||||
import org.springframework.integration.handler.AbstractMessageHandler;
|
||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
||||
import org.springframework.integration.mqtt.support.MqttHeaders;
|
||||
import org.springframework.integration.support.converter.MessageConverter;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Abstract class for MQTT outbound channel adapters.
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractMqttMessageHandler extends AbstractMessageHandler implements SmartLifecycle {
|
||||
|
||||
private final String url;
|
||||
|
||||
private final String clientId;
|
||||
|
||||
private volatile String defaultTopic;
|
||||
|
||||
private volatile int defaultQos = 0;
|
||||
|
||||
private volatile boolean defaultRetained = false;
|
||||
|
||||
private volatile MessageConverter converter;
|
||||
|
||||
private boolean running;
|
||||
|
||||
private volatile int phase;
|
||||
|
||||
private volatile boolean autoStartup;
|
||||
|
||||
public AbstractMqttMessageHandler(String url, String clientId) {
|
||||
Assert.hasText(url, "'url' cannot be null or empty");
|
||||
Assert.hasText(clientId, "'clientId' cannot be null or empty");
|
||||
this.url = url;
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
public void setDefaultTopic(String defaultTopic) {
|
||||
this.defaultTopic = defaultTopic;
|
||||
}
|
||||
|
||||
public void setDefaultQos(int defaultQos) {
|
||||
this.defaultQos = defaultQos;
|
||||
}
|
||||
|
||||
public void setDefaultRetained(boolean defaultRetain) {
|
||||
this.defaultRetained = defaultRetain;
|
||||
}
|
||||
|
||||
public void setConverter(MessageConverter converter) {
|
||||
Assert.notNull(converter, "'converter' cannot be null");
|
||||
this.converter = converter;
|
||||
}
|
||||
|
||||
protected String getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
protected String getClientId() {
|
||||
return clientId;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onInit() throws Exception {
|
||||
super.onInit();
|
||||
if (this.converter == null) {
|
||||
this.converter = new DefaultPahoMessageConverter(this.defaultQos, this.defaultRetained);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void start() {
|
||||
this.doStart();
|
||||
}
|
||||
|
||||
protected abstract void doStart();
|
||||
|
||||
@Override
|
||||
public final void stop() {
|
||||
this.doStop();
|
||||
}
|
||||
|
||||
protected abstract void doStop();
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return this.running;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return this.phase;
|
||||
}
|
||||
|
||||
public void setPhase(int phase) {
|
||||
this.phase = phase;
|
||||
}
|
||||
|
||||
public void setAutoStartup(boolean autoStartup) {
|
||||
this.autoStartup = autoStartup;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return this.autoStartup;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
this.stop();
|
||||
callback.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleMessageInternal(Message<?> message) throws Exception {
|
||||
this.connectIfNeeded();
|
||||
String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
|
||||
Object mqttMessage = this.converter.fromMessage(message);
|
||||
if (topic == null && this.defaultTopic == null) {
|
||||
throw new MessageHandlingException(message,
|
||||
"No '" + MqttHeaders.TOPIC + "' header and no default topic defined");
|
||||
}
|
||||
this.publish(topic == null ? this.defaultTopic : topic, mqttMessage);
|
||||
}
|
||||
|
||||
protected abstract void connectIfNeeded();
|
||||
|
||||
protected abstract void publish(String topic, Object mqttMessage) throws Exception;
|
||||
|
||||
@Override
|
||||
public String getComponentType() {
|
||||
return "mqtt:outbound-channel-adapter";
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,117 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.outbound;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.springframework.integration.MessagingException;
|
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Eclipse Paho implementation.
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public class MqttPahoMessageHandler extends AbstractMqttMessageHandler
|
||||
implements MqttCallback {
|
||||
|
||||
private final MqttPahoClientFactory clientFactory;
|
||||
|
||||
private volatile MqttClient client;
|
||||
|
||||
public MqttPahoMessageHandler(String url, String clientId, MqttPahoClientFactory factory) {
|
||||
super(url, clientId);
|
||||
this.clientFactory = factory;
|
||||
}
|
||||
|
||||
public MqttPahoMessageHandler(String url, String clientId) {
|
||||
this(url, clientId, new DefaultMqttPahoClientFactory());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
try {
|
||||
if (this.client != null) {
|
||||
this.client.disconnect();
|
||||
this.client.close();
|
||||
this.client = null;
|
||||
}
|
||||
}
|
||||
catch (MqttException e) {
|
||||
logger.error("Failed to disconnect", e);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void doConnect() throws MqttException {
|
||||
if (this.client != null && !this.client.isConnected()) {
|
||||
this.client.close();
|
||||
this.client = null;
|
||||
}
|
||||
if (this.client == null) {
|
||||
this.client = this.clientFactory.getClientInstance(this.getUrl(), this.getClientId());
|
||||
this.client.connect(this.clientFactory.getConnectionOptions());
|
||||
this.client.setCallback(this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Client connected");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void connectIfNeeded() {
|
||||
if (this.client == null || !this.client.isConnected()) {
|
||||
try {
|
||||
this.doConnect();
|
||||
}
|
||||
catch (MqttException e) {
|
||||
throw new MessagingException("Failed to connect", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void publish(String topic, Object mqttMessage) throws Exception {
|
||||
Assert.isInstanceOf(MqttMessage.class, mqttMessage);
|
||||
this.client.publish(topic, (MqttMessage) mqttMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionLost(Throwable cause) {
|
||||
logger.error("Lost connection; will attempt reconnect on next request");
|
||||
this.client = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* Provides Spring Integration components for doing outbound operations.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.outbound;
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* Root package of the MqttAdapter Module.
|
||||
*/
|
||||
package org.springframework.integration.mqtt;
|
||||
@@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.support;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.springframework.integration.Message;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.integration.support.converter.MessageConversionException;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
|
||||
/**
|
||||
* Default implementation allowing most connection options to be configured.
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public class DefaultPahoMessageConverter implements MqttMessageConverter {
|
||||
|
||||
private final String charset;
|
||||
|
||||
private final Integer defaultQos;
|
||||
|
||||
private final Boolean defaultRetained;
|
||||
|
||||
public DefaultPahoMessageConverter() {
|
||||
this (0, false);
|
||||
}
|
||||
|
||||
public DefaultPahoMessageConverter(int defaultQos, boolean defaultRetain) {
|
||||
this(defaultQos, defaultRetain, "UTF-8");
|
||||
}
|
||||
|
||||
public DefaultPahoMessageConverter(int defaultQos, boolean defaultRetained, String charset) {
|
||||
this.defaultQos = defaultQos;
|
||||
this.defaultRetained = defaultRetained;
|
||||
this.charset = charset;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <P> Message<P> toMessage(Object object) {
|
||||
return (Message<P>) toMessage(null, object);
|
||||
}
|
||||
|
||||
public Message<String> toMessage(String topic, Object object) {
|
||||
Assert.isInstanceOf(MqttMessage.class, object);
|
||||
MqttMessage message = (MqttMessage) object;
|
||||
try {
|
||||
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(new String(message.getPayload(), this.charset))
|
||||
.setHeader(MqttHeaders.QOS, message.getQos())
|
||||
.setHeader(MqttHeaders.DUPLICATE, message.isDuplicate())
|
||||
.setHeader(MqttHeaders.RETAINED, message.isRetained());
|
||||
if (topic != null) {
|
||||
messageBuilder.setHeader(MqttHeaders.TOPIC, topic);
|
||||
}
|
||||
return messageBuilder.build();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new MessageConversionException("failed to convert object to Message", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <P> Object fromMessage(Message<P> message) {
|
||||
Object payload = message.getPayload();
|
||||
Assert.isTrue(payload instanceof byte[] || payload instanceof String);
|
||||
byte[] payloadBytes;
|
||||
if (payload instanceof String) {
|
||||
try {
|
||||
payloadBytes = ((String) payload).getBytes(this.charset);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new MessageConversionException("failed to convert Message to object", e);
|
||||
}
|
||||
}
|
||||
else {
|
||||
payloadBytes = (byte[]) payload;
|
||||
}
|
||||
MqttMessage mqttMessage = new MqttMessage(payloadBytes);
|
||||
Object header = message.getHeaders().get(MqttHeaders.RETAINED);
|
||||
Assert.isTrue(header == null || header instanceof Boolean, MqttHeaders.RETAINED + " header must be Boolean");
|
||||
mqttMessage.setRetained(header == null ? this.defaultRetained : (Boolean) header);
|
||||
header = message.getHeaders().get(MqttHeaders.QOS);
|
||||
Assert.isTrue(header == null || header instanceof Integer, MqttHeaders.QOS + " header must be Integer");
|
||||
mqttMessage.setQos(header == null ? this.defaultQos : (Integer) header);
|
||||
return mqttMessage;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.support;
|
||||
|
||||
/**
|
||||
* Spring Integration headers.
|
||||
* @author Gary Russell
|
||||
* @since 3.0
|
||||
*
|
||||
*/
|
||||
public class MqttHeaders {
|
||||
|
||||
private static final String prefix = "mqtt_";
|
||||
|
||||
public static final String QOS = prefix + "qos";
|
||||
|
||||
public static final String DUPLICATE = prefix + "duplicate";
|
||||
|
||||
public static final String RETAINED = prefix + "retained";
|
||||
|
||||
public static final String TOPIC = prefix + "topic";
|
||||
private MqttHeaders() {
|
||||
throw new AssertionError();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.support;
|
||||
|
||||
import org.springframework.integration.Message;
|
||||
import org.springframework.integration.support.converter.MessageConverter;
|
||||
|
||||
/**
|
||||
* Extension of {@link MessageConverter} allowing the topic to be added as
|
||||
* a header.
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public interface MqttMessageConverter extends MessageConverter {
|
||||
|
||||
Message<String> toMessage(String topic, Object object);
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.support;
|
||||
|
||||
|
||||
/**
|
||||
* Contains utility methods used by the MqttAdapter components.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.0
|
||||
*
|
||||
*/
|
||||
public final class MqttUtils {
|
||||
|
||||
/** Prevent instantiation. */
|
||||
private MqttUtils() {
|
||||
throw new AssertionError();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* Provides various support classes used across Spring Integration MqttAdapter Components.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.support;
|
||||
@@ -0,0 +1 @@
|
||||
http\://www.springframework.org/schema/integration/mqtt=org.springframework.integration.mqtt.config.xml.MqttNamespaceHandler
|
||||
@@ -0,0 +1,2 @@
|
||||
http\://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-1.0.xsd=org/springframework/integration/mqtt/config/xml/spring-integration-mqtt-1.0.xsd
|
||||
http\://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt.xsd=org/springframework/integration/mqtt/config/xml/spring-integration-mqtt-1.0.xsd
|
||||
@@ -0,0 +1,4 @@
|
||||
# Tooling related information for the integration MqttAdapter namespace
|
||||
http\://www.springframework.org/schema/integration/mqttadapter@name=integration MqttAdapter Namespace
|
||||
http\://www.springframework.org/schema/integration/mqttadapter@prefix=int-mqttadapter
|
||||
http\://www.springframework.org/schema/integration/mqttadapter@icon=org/springframework/integration/config/xml/spring-integration-mqttadapter.gif
|
||||
@@ -0,0 +1,200 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<xsd:schema xmlns="http://www.springframework.org/schema/integration/mqtt"
|
||||
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:beans="http://www.springframework.org/schema/beans"
|
||||
xmlns:tool="http://www.springframework.org/schema/tool"
|
||||
xmlns:integration="http://www.springframework.org/schema/integration"
|
||||
targetNamespace="http://www.springframework.org/schema/integration/mqtt"
|
||||
elementFormDefault="qualified" attributeFormDefault="unqualified">
|
||||
|
||||
<xsd:import namespace="http://www.springframework.org/schema/beans" />
|
||||
<xsd:import namespace="http://www.springframework.org/schema/tool" />
|
||||
<xsd:import namespace="http://www.springframework.org/schema/integration"
|
||||
schemaLocation="http://www.springframework.org/schema/integration/spring-integration.xsd" />
|
||||
|
||||
<xsd:annotation>
|
||||
<xsd:documentation><![CDATA[
|
||||
Defines the configuration elements for the Spring Integration
|
||||
Mqtt Adapters.
|
||||
]]></xsd:documentation>
|
||||
</xsd:annotation>
|
||||
|
||||
<xsd:element name="message-driven-channel-adapter">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The definition for the Spring Integration MqttAdapter
|
||||
Inbound Channel Adapter.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:complexType>
|
||||
<xsd:attributeGroup ref="coreMqttComponentAttributes"/>
|
||||
<xsd:attribute name="channel" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:appinfo>
|
||||
<tool:annotation kind="ref">
|
||||
<tool:expected-type type="org.springframework.integration.core.MessageChannel" />
|
||||
</tool:annotation>
|
||||
</xsd:appinfo>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="topics">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Specifies one or more (comma-delimited) topics on which to listen for messages.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="send-timeout" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation><![CDATA[
|
||||
Allows you to specify how long this inbound-channel-adapter
|
||||
will wait for the message (containing the retrieved entities)
|
||||
to be sent successfully to the message channel, before throwing
|
||||
an exception.
|
||||
|
||||
Keep in mind that when sending to a DirectChannel, the
|
||||
invocation will occur in the sender's thread so the failing
|
||||
of the send operation may be caused by other components
|
||||
further downstream. By default the Inbound Channel Adapter
|
||||
will wait indefinitely. The value is specified in milliseconds.
|
||||
]]>
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="outbound-channel-adapter">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Defines an outbound Channel Adapter.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:complexType>
|
||||
<xsd:sequence>
|
||||
<xsd:element ref="integration:poller" minOccurs="0" maxOccurs="1" />
|
||||
</xsd:sequence>
|
||||
<xsd:attributeGroup ref="coreMqttComponentAttributes"/>
|
||||
<xsd:attribute name="channel" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:appinfo>
|
||||
<xsd:documentation>
|
||||
Channel from which messages will be output.
|
||||
When a message is sent to this channel it will
|
||||
cause the query
|
||||
to be executed.
|
||||
</xsd:documentation>
|
||||
<tool:annotation kind="ref">
|
||||
<tool:expected-type type="org.springframework.integration.MessageChannel" />
|
||||
</tool:annotation>
|
||||
</xsd:appinfo>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="order">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Specifies the order for invocation when this endpoint is connected as a
|
||||
subscriber to a SubscribableChannel.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="default-topic">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Specifies the default topic to which messages will be sent. Required if an
|
||||
outbound message does not have an 'mqtt_topic' header.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="default-qos">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Specifies the default quality of service. Default 0.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="default-retained">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Specifies the default value of the 'retained' flag. Default false.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:attributeGroup name="coreMqttComponentAttributes">
|
||||
<xsd:attribute name="id" type="xsd:string" use="optional">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Identifies the underlying Spring bean definition, which is an
|
||||
instance of either 'EventDrivenConsumer' or 'PollingConsumer',
|
||||
depending on whether the component's input channel is a
|
||||
'SubscribableChannel' or 'PollableChannel'.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="auto-startup" default="true" use="optional">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Flag to indicate that the component should start automatically
|
||||
on startup (default true).
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:simpleType>
|
||||
<xsd:union memberTypes="xsd:boolean xsd:string" />
|
||||
</xsd:simpleType>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="phase" use="optional">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Flag to indicate the phase in which the component should start automatically
|
||||
on startup. See SmartLifecycle.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:simpleType>
|
||||
<xsd:union memberTypes="xsd:integer xsd:string" />
|
||||
</xsd:simpleType>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="url" use="required" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
MQTT broker URL.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="client-id" use="required" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
MQTT client ID.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="converter" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:appinfo>
|
||||
<xsd:documentation><![CDATA[
|
||||
A message converter to convert Spring Integration Message<String> to/from
|
||||
a paho MqttMessage. Default is DefaultMqttMessageConverter.
|
||||
]]></xsd:documentation>
|
||||
<tool:annotation kind="ref">
|
||||
<tool:expected-type type="org.springframework.integration.mqtt.support.MqttMessageConverter" />
|
||||
</tool:annotation>
|
||||
</xsd:appinfo>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="client-factory" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:appinfo>
|
||||
<xsd:documentation><![CDATA[
|
||||
An MqttClientFactory used to create clients and connection options if you wish to
|
||||
override the defaults. Default is DefaultMqttClientFactory.
|
||||
]]></xsd:documentation>
|
||||
<tool:annotation kind="ref">
|
||||
<tool:expected-type type="org.springframework.integration.mqtt.support.MqttMessageConverter" />
|
||||
</tool:annotation>
|
||||
</xsd:appinfo>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
</xsd:attributeGroup>
|
||||
|
||||
</xsd:schema>
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 572 B |
@@ -0,0 +1,73 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<chapter xmlns="http://docbook.org/ns/docbook" version="5.0" xml:id="SIAdapterLowerPrefix"
|
||||
xmlns:xlink="http://www.w3.org/1999/xlink">
|
||||
<title>MqttAdapter Adapter</title>
|
||||
<para>
|
||||
The Spring Integration MqttAdapter Adapter provides...
|
||||
</para>
|
||||
<itemizedlist>
|
||||
<listitem>
|
||||
<para><emphasis><link linkend='SIAdapterLowerPrefix-outbound-channel-adapter'>Outbound Channel adapter</link></emphasis></para>
|
||||
</listitem>
|
||||
<listitem>
|
||||
<para><emphasis><link linkend='SIAdapterLowerPrefix-outbound-gateway'>Outbound Gateway</link></emphasis></para>
|
||||
</listitem>
|
||||
<listitem>
|
||||
<para><emphasis><link linkend='SIAdapterLowerPrefix-inbound-channel-adapter'>Inbound Channel Adapter</link></emphasis></para>
|
||||
</listitem>
|
||||
</itemizedlist>
|
||||
|
||||
<section id="jpa-java-implementation">
|
||||
<title>Java Implementation</title>
|
||||
<para>Each of the provided components will use the
|
||||
<classname>org.springframework.integration.mqtt.core.MqttAdapterExecutor</classname>
|
||||
class...
|
||||
</para>
|
||||
</section>
|
||||
<section id="jpa-common-configuration-attributes">
|
||||
<title>Common Configuration Attributes</title>
|
||||
<para>
|
||||
Certain configuration parameters are shared amongst all MqttAdapter
|
||||
components and are described below:
|
||||
</para>
|
||||
|
||||
<para><emphasis role="bold">auto-startup</emphasis></para>
|
||||
<para>
|
||||
Lifecycle attribute signaling if this component should
|
||||
be started during Application Context startup.
|
||||
Defaults to <code>true</code>.
|
||||
<emphasis>Optional</emphasis>.
|
||||
</para>
|
||||
|
||||
<para><emphasis role="bold">id</emphasis></para>
|
||||
<para>
|
||||
Identifies the underlying Spring bean definition, which
|
||||
is an instance of either <classname>EventDrivenConsumer</classname>
|
||||
or <classname>PollingConsumer</classname>.
|
||||
<emphasis>Optional</emphasis>.
|
||||
</para>
|
||||
|
||||
</section>
|
||||
|
||||
<section id="SIAdapterLowerPrefix-outbound-channel-adapter">
|
||||
<title>Outbound Channel Adapter</title>
|
||||
<para>
|
||||
The MqttAdapter Outbound channel adapter allows you to...
|
||||
</para>
|
||||
</section>
|
||||
<section id="SIAdapterLowerPrefix-outbound-gateway">
|
||||
<title>Outbound Gateway</title>
|
||||
<para>
|
||||
Outbound gateways are similar to outbound channel adapters except that it can also be used to
|
||||
get a result on the <emphasis>reply channel</emphasis> after performing
|
||||
the given...
|
||||
</para>
|
||||
</section>
|
||||
<section id="SIAdapterLowerPrefix-inbound-channel-adapter">
|
||||
<title>Inbound Channel Adapter</title>
|
||||
<para>
|
||||
An inbound channel adapter is used to execute...
|
||||
</para>
|
||||
</section>
|
||||
|
||||
</chapter>
|
||||
@@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<appendix xmlns="http://docbook.org/ns/docbook" version="5.0" xml:id="history"
|
||||
xmlns:xlink="http://www.w3.org/1999/xlink"
|
||||
xmlns:xi="http://www.w3.org/2001/XInclude">
|
||||
<title>Change History</title>
|
||||
|
||||
|
||||
</appendix>
|
||||
BIN
spring-integration-mqtt/src/reference/docbook/images/logo.png
Normal file
BIN
spring-integration-mqtt/src/reference/docbook/images/logo.png
Normal file
Binary file not shown.
68
spring-integration-mqtt/src/reference/docbook/index.xml
Normal file
68
spring-integration-mqtt/src/reference/docbook/index.xml
Normal file
@@ -0,0 +1,68 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<book xmlns="http://docbook.org/ns/docbook" version="5.0"
|
||||
xml:id="spring-integration-reference" xmlns:xi="http://www.w3.org/2001/XInclude"
|
||||
xmlns:xlink="http://www.w3.org/1999/xlink">
|
||||
<bookinfo>
|
||||
<title>Spring Integration MqttAdapter Adapter</title>
|
||||
<titleabbrev>MqttAdapter Adapter ${version}</titleabbrev>
|
||||
<productname>Spring Integration</productname>
|
||||
<releaseinfo>${version}</releaseinfo>
|
||||
|
||||
<!-- TODO: this isn't showing up. -->
|
||||
<mediaobject>
|
||||
<imageobject role="fo">
|
||||
<imagedata fileref="images/logo.png" format="PNG" align="center" />
|
||||
</imageobject>
|
||||
<imageobject role="html">
|
||||
<imagedata fileref="images/logo.png" format="PNG" align="center" />
|
||||
</imageobject>
|
||||
</mediaobject>
|
||||
<!-- END TODO -->
|
||||
|
||||
<authorgroup>
|
||||
<author><firstname>Gary Russell</firstname></author>
|
||||
</authorgroup>
|
||||
<legalnotice>
|
||||
<para>© SpringSource Inc., 2012</para>
|
||||
</legalnotice>
|
||||
</bookinfo>
|
||||
|
||||
<toc></toc>
|
||||
|
||||
<part id="whats-new-part">
|
||||
<title>What's new?</title>
|
||||
<partintro id="spring-integration-intro">
|
||||
<para>
|
||||
For those who are already familiar with Spring Integration, this
|
||||
chapter
|
||||
provides a brief overview of the new features of version 2.2. If you are
|
||||
interested in the changes and features, that were introduced in
|
||||
earlier
|
||||
versions, please take a look at chapter:
|
||||
|
||||
<xref linkend="history" />
|
||||
|
||||
</para>
|
||||
</partintro>
|
||||
<xi:include href="./whats-new.xml" />
|
||||
</part>
|
||||
|
||||
<part id="spring-integration-adapters">
|
||||
<title>Integration Adapters</title>
|
||||
<partintro id="spring-integration-adapters">
|
||||
<para>This section covers the various Channel Adapters and Messaging
|
||||
Gateways provided
|
||||
by Spring Integration to support Message-based communication with
|
||||
external systems.
|
||||
</para>
|
||||
</partintro>
|
||||
<xi:include href="./SIAdapterLowerPrefix.xml" />
|
||||
</part>
|
||||
<part id="spring-integration-appendices">
|
||||
<title>Appendices</title>
|
||||
<partintro id="spring-integration-adapters">
|
||||
<para>Advanced Topics and Additional Resources</para>
|
||||
</partintro>
|
||||
<xi:include href="./history.xml" />
|
||||
</part>
|
||||
</book>
|
||||
17
spring-integration-mqtt/src/reference/docbook/resources.xml
Normal file
17
spring-integration-mqtt/src/reference/docbook/resources.xml
Normal file
@@ -0,0 +1,17 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<appendix xmlns="http://docbook.org/ns/docbook" version="5.0" xml:id="resources"
|
||||
xmlns:xlink="http://www.w3.org/1999/xlink">
|
||||
<title>Additional Resources</title>
|
||||
|
||||
<section id="resources-home">
|
||||
<title>Spring Integration Home</title>
|
||||
<para>
|
||||
The definitive source of information about Spring Integration is the
|
||||
<ulink url="http://www.springsource.org/spring-integration">Spring Integration Home</ulink> at
|
||||
<ulink url="http://www.springsource.org">http://www.springsource.org</ulink>. That site serves as a hub of
|
||||
information and is the best place to find up-to-date announcements about the project as well as links to
|
||||
articles, blogs, and new sample applications.
|
||||
</para>
|
||||
</section>
|
||||
|
||||
</appendix>
|
||||
11
spring-integration-mqtt/src/reference/docbook/whats-new.xml
Normal file
11
spring-integration-mqtt/src/reference/docbook/whats-new.xml
Normal file
@@ -0,0 +1,11 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<chapter xmlns="http://docbook.org/ns/docbook" version="5.0" xml:id="whats-new"
|
||||
xmlns:xi="http://www.w3.org/2001/XInclude"
|
||||
xmlns:xlink="http://www.w3.org/1999/xlink">
|
||||
<title>What's new?</title>
|
||||
<para>
|
||||
This chapter provides an overview of the new features and improvements
|
||||
that have been added to the MqttAdapter Adapter:
|
||||
</para>
|
||||
|
||||
</chapter>
|
||||
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.springframework.integration.Message;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.message.GenericMessage;
|
||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
||||
import org.springframework.integration.mqtt.support.MqttHeaders;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 3.0
|
||||
*
|
||||
*/
|
||||
public class BackTobackAdapterTests {
|
||||
|
||||
@Rule
|
||||
public final BrokerRunning brokerRunning = BrokerRunning.isRunning(1883);
|
||||
|
||||
@Test
|
||||
public void testSingleTopic() {
|
||||
MqttPahoMessageHandler adapter = new MqttPahoMessageHandler("tcp://localhost:1883", "si-test-out");
|
||||
adapter.setDefaultTopic("mqtt-foo");
|
||||
adapter.afterPropertiesSet();
|
||||
adapter.start();
|
||||
MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "si-test-in", "mqtt-foo");
|
||||
QueueChannel outputChannel = new QueueChannel();
|
||||
inbound.setOutputChannel(outputChannel);
|
||||
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
|
||||
taskScheduler.initialize();
|
||||
inbound.setTaskScheduler(taskScheduler);
|
||||
inbound.afterPropertiesSet();
|
||||
inbound.start();
|
||||
adapter.handleMessage(new GenericMessage<String>("foo"));
|
||||
adapter.stop();
|
||||
Message<?> out = outputChannel.receive(1000);
|
||||
assertNotNull(out);
|
||||
inbound.stop();
|
||||
assertEquals("foo", out.getPayload());
|
||||
assertEquals("mqtt-foo", out.getHeaders().get(MqttHeaders.TOPIC));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoTopics() {
|
||||
MqttPahoMessageHandler adapter = new MqttPahoMessageHandler("tcp://localhost:1883", "si-test-out");
|
||||
adapter.setDefaultTopic("mqtt-foo");
|
||||
adapter.afterPropertiesSet();
|
||||
adapter.start();
|
||||
MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "si-test-in", "mqtt-foo", "mqtt-bar");
|
||||
QueueChannel outputChannel = new QueueChannel();
|
||||
inbound.setOutputChannel(outputChannel);
|
||||
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
|
||||
taskScheduler.initialize();
|
||||
inbound.setTaskScheduler(taskScheduler);
|
||||
inbound.afterPropertiesSet();
|
||||
inbound.start();
|
||||
adapter.handleMessage(new GenericMessage<String>("foo"));
|
||||
Message<?> message = MessageBuilder.withPayload("bar").setHeader(MqttHeaders.TOPIC, "mqtt-bar").build();
|
||||
adapter.handleMessage(message);
|
||||
adapter.stop();
|
||||
Message<?> out = outputChannel.receive(1000);
|
||||
assertNotNull(out);
|
||||
inbound.stop();
|
||||
assertEquals("foo", out.getPayload());
|
||||
assertEquals("mqtt-foo", out.getHeaders().get(MqttHeaders.TOPIC));
|
||||
out = outputChannel.receive(1000);
|
||||
assertNotNull(out);
|
||||
inbound.stop();
|
||||
assertEquals("bar", out.getPayload());
|
||||
assertEquals("mqtt-bar", out.getHeaders().get(MqttHeaders.TOPIC)); }
|
||||
|
||||
}
|
||||
@@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt;
|
||||
|
||||
import static org.junit.Assume.assumeNoException;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.junit.rules.TestWatcher;
|
||||
import org.junit.runner.Description;
|
||||
import org.junit.runners.model.Statement;
|
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 3.0
|
||||
*
|
||||
*/
|
||||
public class BrokerRunning extends TestWatcher {
|
||||
|
||||
private static Log logger = LogFactory.getLog(BrokerRunning.class);
|
||||
|
||||
// Static so that we only test once on failure: speeds up test suite
|
||||
private static Map<Integer,Boolean> brokerOnline = new HashMap<Integer, Boolean>();
|
||||
|
||||
private final int port;
|
||||
|
||||
private BrokerRunning(int port) {
|
||||
this.port = port;
|
||||
brokerOnline.put(port, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Statement apply(Statement base, Description description) {
|
||||
assumeTrue(brokerOnline.get(port));
|
||||
String url = "tcp://localhost:" + port;
|
||||
MqttClient client = null;
|
||||
try {
|
||||
client = new DefaultMqttPahoClientFactory().getClientInstance(url, "junit-" + System.currentTimeMillis());
|
||||
client.connect();
|
||||
}
|
||||
catch (MqttException e) {
|
||||
logger.warn("Tests not running because no broker on " + url + ":", e);
|
||||
assumeNoException(e);
|
||||
}
|
||||
finally {
|
||||
if (client != null) {
|
||||
try {
|
||||
client.close();
|
||||
}
|
||||
catch (MqttException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return super.apply(base, description);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static BrokerRunning isRunning(int port) {
|
||||
return new BrokerRunning(port);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,237 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.springframework.integration.Message;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.message.GenericMessage;
|
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory.Will;
|
||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 3.0
|
||||
*
|
||||
*/
|
||||
public class MqttAdapterTests {
|
||||
|
||||
@Test
|
||||
public void testPahoConnectOptions() {
|
||||
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
||||
factory.setCleanSession(false);
|
||||
factory.setConnectionTimeout(23);
|
||||
factory.setKeepAliveInterval(45);
|
||||
factory.setPassword("pass");
|
||||
SocketFactory socketFactory = mock(SocketFactory.class);
|
||||
factory.setSocketFactory(socketFactory);
|
||||
Properties props = new Properties();
|
||||
factory.setSslProperties(props);
|
||||
factory.setUserName("user");
|
||||
Will will = new Will("foo", "bar".getBytes(), 2, true);
|
||||
factory.setWill(will);
|
||||
|
||||
MqttConnectOptions options = factory.getConnectionOptions();
|
||||
|
||||
assertEquals(23, options.getConnectionTimeout());
|
||||
assertEquals(45, options.getKeepAliveInterval());
|
||||
assertEquals("pass", new String(options.getPassword()));
|
||||
assertSame(socketFactory, options.getSocketFactory());
|
||||
assertSame(props, options.getSSLProperties());
|
||||
assertEquals("user", options.getUserName());
|
||||
assertEquals("foo", options.getWillDestination());
|
||||
assertEquals("bar", new String(options.getWillMessage().getPayload()));
|
||||
assertEquals(2, options.getWillMessage().getQos());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOutboundOptionsApplied() throws Exception {
|
||||
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
||||
factory.setCleanSession(false);
|
||||
factory.setConnectionTimeout(23);
|
||||
factory.setKeepAliveInterval(45);
|
||||
factory.setPassword("pass");
|
||||
MemoryPersistence persistence = new MemoryPersistence();
|
||||
factory.setPersistence(persistence);
|
||||
final SocketFactory socketFactory = mock(SocketFactory.class);
|
||||
factory.setSocketFactory(socketFactory);
|
||||
final Properties props = new Properties();
|
||||
factory.setSslProperties(props);
|
||||
factory.setUserName("user");
|
||||
Will will = new Will("foo", "bar".getBytes(), 2, true);
|
||||
factory.setWill(will);
|
||||
|
||||
factory = spy(factory);
|
||||
final MqttClient client = mock(MqttClient.class);
|
||||
doAnswer(new Answer<MqttClient>() {
|
||||
|
||||
@Override
|
||||
public MqttClient answer(InvocationOnMock invocation) throws Throwable {
|
||||
return client;
|
||||
}
|
||||
}).when(factory).getClientInstance(anyString(), anyString());
|
||||
|
||||
MqttPahoMessageHandler handler = new MqttPahoMessageHandler("foo", "bar", factory);
|
||||
handler.setDefaultTopic("mqtt-foo");
|
||||
handler.afterPropertiesSet();
|
||||
handler.start();
|
||||
final AtomicBoolean connectCalled = new AtomicBoolean();
|
||||
doAnswer(new Answer<Object>(){
|
||||
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
MqttConnectOptions options = (MqttConnectOptions) invocation.getArguments()[0];
|
||||
assertEquals(23, options.getConnectionTimeout());
|
||||
assertEquals(45, options.getKeepAliveInterval());
|
||||
assertEquals("pass", new String(options.getPassword()));
|
||||
assertSame(socketFactory, options.getSocketFactory());
|
||||
assertSame(props, options.getSSLProperties());
|
||||
assertEquals("user", options.getUserName());
|
||||
assertEquals("foo", options.getWillDestination());
|
||||
assertEquals("bar", new String(options.getWillMessage().getPayload()));
|
||||
assertEquals(2, options.getWillMessage().getQos());
|
||||
connectCalled.set(true);
|
||||
return null;
|
||||
}
|
||||
}).when(client).connect(any(MqttConnectOptions.class));
|
||||
final AtomicBoolean publishCalled = new AtomicBoolean();
|
||||
doAnswer(new Answer<Object>() {
|
||||
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
assertEquals("mqtt-foo", invocation.getArguments()[0]);
|
||||
MqttMessage message = (MqttMessage) invocation.getArguments()[1];
|
||||
assertEquals("Hello, world!", new String(message.getPayload()));
|
||||
publishCalled.set(true);
|
||||
return null;
|
||||
}
|
||||
}).when(client).publish(anyString(), any(MqttMessage.class));
|
||||
|
||||
handler.handleMessage(new GenericMessage<String>("Hello, world!"));
|
||||
|
||||
verify(client, times(1)).connect(any(MqttConnectOptions.class));
|
||||
assertTrue(connectCalled.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInboundOptionsApplied() throws Exception {
|
||||
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
||||
factory.setCleanSession(false);
|
||||
factory.setConnectionTimeout(23);
|
||||
factory.setKeepAliveInterval(45);
|
||||
factory.setPassword("pass");
|
||||
MemoryPersistence persistence = new MemoryPersistence();
|
||||
factory.setPersistence(persistence);
|
||||
final SocketFactory socketFactory = mock(SocketFactory.class);
|
||||
factory.setSocketFactory(socketFactory);
|
||||
final Properties props = new Properties();
|
||||
factory.setSslProperties(props);
|
||||
factory.setUserName("user");
|
||||
Will will = new Will("foo", "bar".getBytes(), 2, true);
|
||||
factory.setWill(will);
|
||||
|
||||
factory = spy(factory);
|
||||
final MqttClient client = mock(MqttClient.class);
|
||||
doAnswer(new Answer<MqttClient>() {
|
||||
|
||||
@Override
|
||||
public MqttClient answer(InvocationOnMock invocation) throws Throwable {
|
||||
return client;
|
||||
}
|
||||
}).when(factory).getClientInstance(anyString(), anyString());
|
||||
|
||||
final AtomicBoolean connectCalled = new AtomicBoolean();
|
||||
doAnswer(new Answer<Object>() {
|
||||
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
MqttConnectOptions options = (MqttConnectOptions) invocation.getArguments()[0];
|
||||
assertEquals(23, options.getConnectionTimeout());
|
||||
assertEquals(45, options.getKeepAliveInterval());
|
||||
assertEquals("pass", new String(options.getPassword()));
|
||||
assertSame(socketFactory, options.getSocketFactory());
|
||||
assertSame(props, options.getSSLProperties());
|
||||
assertEquals("user", options.getUserName());
|
||||
assertEquals("foo", options.getWillDestination());
|
||||
assertEquals("bar", new String(options.getWillMessage().getPayload()));
|
||||
assertEquals(2, options.getWillMessage().getQos());
|
||||
connectCalled.set(true);
|
||||
return null;
|
||||
}
|
||||
}).when(client).connect(any(MqttConnectOptions.class));
|
||||
|
||||
final AtomicReference<MqttCallback> callback = new AtomicReference<MqttCallback>();
|
||||
doAnswer(new Answer<Object>() {
|
||||
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
callback.set((MqttCallback) invocation.getArguments()[0]);
|
||||
return null;
|
||||
}
|
||||
}).when(client).setCallback(any(MqttCallback.class));
|
||||
|
||||
when(client.isConnected()).thenReturn(true);
|
||||
|
||||
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory, "baz");
|
||||
QueueChannel outputChannel = new QueueChannel();
|
||||
adapter.setOutputChannel(outputChannel);
|
||||
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
|
||||
taskScheduler.initialize();
|
||||
adapter.setTaskScheduler(taskScheduler);
|
||||
adapter.afterPropertiesSet();
|
||||
adapter.start();
|
||||
|
||||
verify(client, times(1)).connect(any(MqttConnectOptions.class));
|
||||
assertTrue(connectCalled.get());
|
||||
|
||||
MqttMessage message = new MqttMessage("qux".getBytes());
|
||||
callback.get().messageArrived("baz", message);
|
||||
Message<?> outMessage = outputChannel.receive(0);
|
||||
assertNotNull(outMessage);
|
||||
assertEquals("qux", outMessage.getPayload());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:int="http://www.springframework.org/schema/integration"
|
||||
xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
|
||||
xsi:schemaLocation="
|
||||
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
|
||||
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
|
||||
http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt.xsd">
|
||||
|
||||
<int:channel id="out"/>
|
||||
|
||||
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
|
||||
auto-startup="false"
|
||||
phase="25"
|
||||
client-id="foo"
|
||||
url="tcp://localhost:1883"
|
||||
topics="bar"
|
||||
converter="myConverter"
|
||||
client-factory="clientFactory"
|
||||
send-timeout="123"
|
||||
channel="out" />
|
||||
|
||||
<int-mqtt:message-driven-channel-adapter id="twoTopicsAdapter"
|
||||
auto-startup="false"
|
||||
phase="25"
|
||||
client-id="foo"
|
||||
url="tcp://localhost:1883"
|
||||
topics="bar, baz"
|
||||
converter="myConverter"
|
||||
client-factory="clientFactory"
|
||||
send-timeout="123"
|
||||
channel="out" />
|
||||
|
||||
<int:channel id="out" />
|
||||
|
||||
<bean id="myConverter" class="org.springframework.integration.mqtt.support.DefaultPahoMessageConverter" />
|
||||
|
||||
<bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory" />
|
||||
</beans>
|
||||
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.config.xml;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertSame;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.integration.MessageChannel;
|
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.mqtt.support.MqttMessageConverter;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 3.0
|
||||
*
|
||||
*/
|
||||
@ContextConfiguration
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
public class MqttMessageDrivenChannelAdapterParserTests {
|
||||
|
||||
@Autowired
|
||||
private MqttPahoMessageDrivenChannelAdapter oneTopicAdapter;
|
||||
|
||||
@Autowired
|
||||
private MqttPahoMessageDrivenChannelAdapter twoTopicsAdapter;
|
||||
|
||||
@Autowired
|
||||
private MessageChannel out;
|
||||
|
||||
@Autowired
|
||||
private MqttMessageConverter converter;
|
||||
|
||||
@Autowired
|
||||
private DefaultMqttPahoClientFactory clientFactory;
|
||||
|
||||
@Test
|
||||
public void testOneTopic() {
|
||||
assertEquals("tcp://localhost:1883", TestUtils.getPropertyValue(oneTopicAdapter, "url"));
|
||||
assertFalse(TestUtils.getPropertyValue(oneTopicAdapter, "autoStartup", Boolean.class));
|
||||
assertEquals(25, TestUtils.getPropertyValue(oneTopicAdapter, "phase"));
|
||||
assertEquals("foo", TestUtils.getPropertyValue(oneTopicAdapter, "clientId"));
|
||||
assertEquals("bar", TestUtils.getPropertyValue(oneTopicAdapter, "topic", String[].class)[0]);
|
||||
assertSame(converter, TestUtils.getPropertyValue(oneTopicAdapter, "converter"));
|
||||
assertEquals(123L, TestUtils.getPropertyValue(oneTopicAdapter, "messagingTemplate.sendTimeout"));
|
||||
assertSame(out, TestUtils.getPropertyValue(oneTopicAdapter, "outputChannel"));
|
||||
assertSame(clientFactory, TestUtils.getPropertyValue(oneTopicAdapter, "clientFactory"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoTopics() {
|
||||
assertEquals("tcp://localhost:1883", TestUtils.getPropertyValue(oneTopicAdapter, "url"));
|
||||
assertFalse(TestUtils.getPropertyValue(twoTopicsAdapter, "autoStartup", Boolean.class));
|
||||
assertEquals(25, TestUtils.getPropertyValue(twoTopicsAdapter, "phase"));
|
||||
assertEquals("foo", TestUtils.getPropertyValue(twoTopicsAdapter, "clientId"));
|
||||
assertEquals("bar", TestUtils.getPropertyValue(twoTopicsAdapter, "topic", String[].class)[0]);
|
||||
assertEquals("baz", TestUtils.getPropertyValue(twoTopicsAdapter, "topic", String[].class)[1]);
|
||||
assertSame(converter, TestUtils.getPropertyValue(twoTopicsAdapter, "converter"));
|
||||
assertEquals(123L, TestUtils.getPropertyValue(twoTopicsAdapter, "messagingTemplate.sendTimeout"));
|
||||
assertSame(out, TestUtils.getPropertyValue(twoTopicsAdapter, "outputChannel"));
|
||||
assertSame(clientFactory, TestUtils.getPropertyValue(twoTopicsAdapter, "clientFactory"));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:int="http://www.springframework.org/schema/integration"
|
||||
xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
|
||||
xsi:schemaLocation="
|
||||
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
|
||||
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
|
||||
http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt.xsd">
|
||||
|
||||
<int:channel id="target"/>
|
||||
|
||||
<int-mqtt:outbound-channel-adapter id="withConverter"
|
||||
client-id="foo"
|
||||
url="tcp://localhost:1883"
|
||||
auto-startup="false"
|
||||
converter="myConverter"
|
||||
client-factory="clientFactory"
|
||||
default-topic="bar"
|
||||
phase="25"
|
||||
order="1"
|
||||
channel="target" />
|
||||
|
||||
<int-mqtt:outbound-channel-adapter id="withDefaultConverter"
|
||||
client-id="foo"
|
||||
url="tcp://localhost:1883"
|
||||
auto-startup="false"
|
||||
default-qos="1"
|
||||
default-retained="true"
|
||||
default-topic="bar"
|
||||
client-factory="clientFactory"
|
||||
phase="25"
|
||||
order="1"
|
||||
channel="target" />
|
||||
|
||||
<bean id="myConverter" class="org.springframework.integration.mqtt.support.DefaultPahoMessageConverter" />
|
||||
|
||||
<bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory" />
|
||||
</beans>
|
||||
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.mqtt.config.xml;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
||||
import org.springframework.integration.mqtt.support.MqttMessageConverter;
|
||||
import org.springframework.integration.support.converter.MessageConverter;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 3.0
|
||||
*
|
||||
*/
|
||||
@ContextConfiguration
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
public class MqttOutboundChannelAdapterParserTests {
|
||||
|
||||
@Autowired @Qualifier("withConverter.handler")
|
||||
private MqttPahoMessageHandler withConverterHandler;
|
||||
|
||||
@Autowired @Qualifier("withDefaultConverter.handler")
|
||||
private MqttPahoMessageHandler withDefaultConverterHandler;
|
||||
|
||||
@Autowired
|
||||
private MqttMessageConverter converter;
|
||||
|
||||
@Autowired
|
||||
private DefaultMqttPahoClientFactory clientFactory;
|
||||
|
||||
@Test
|
||||
public void testWithConverter() {
|
||||
assertEquals("tcp://localhost:1883", TestUtils.getPropertyValue(withConverterHandler, "url"));
|
||||
assertFalse(TestUtils.getPropertyValue(withConverterHandler, "autoStartup", Boolean.class));
|
||||
assertEquals(25, TestUtils.getPropertyValue(withConverterHandler, "phase"));
|
||||
assertEquals("foo", TestUtils.getPropertyValue(withConverterHandler, "clientId"));
|
||||
assertEquals("bar", TestUtils.getPropertyValue(withConverterHandler, "defaultTopic"));
|
||||
assertSame(converter, TestUtils.getPropertyValue(withConverterHandler, "converter"));
|
||||
assertSame(clientFactory, TestUtils.getPropertyValue(withConverterHandler, "clientFactory"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithDefaultConverter() {
|
||||
assertEquals("tcp://localhost:1883", TestUtils.getPropertyValue(withDefaultConverterHandler, "url"));
|
||||
assertFalse(TestUtils.getPropertyValue(withDefaultConverterHandler, "autoStartup", Boolean.class));
|
||||
assertEquals(25, TestUtils.getPropertyValue(withDefaultConverterHandler, "phase"));
|
||||
assertEquals("foo", TestUtils.getPropertyValue(withDefaultConverterHandler, "clientId"));
|
||||
assertEquals("bar", TestUtils.getPropertyValue(withDefaultConverterHandler, "defaultTopic"));
|
||||
assertEquals(1, TestUtils.getPropertyValue(withDefaultConverterHandler, "defaultQos"));
|
||||
assertTrue(TestUtils.getPropertyValue(withDefaultConverterHandler, "defaultRetained", Boolean.class));
|
||||
MessageConverter defaultConverter = TestUtils.getPropertyValue(withDefaultConverterHandler, "converter", MessageConverter.class);
|
||||
assertTrue(defaultConverter instanceof DefaultPahoMessageConverter);
|
||||
assertEquals(1, TestUtils.getPropertyValue(defaultConverter, "defaultQos"));
|
||||
assertTrue(TestUtils.getPropertyValue(defaultConverter, "defaultRetained", Boolean.class));
|
||||
assertSame(clientFactory, TestUtils.getPropertyValue(withDefaultConverterHandler, "clientFactory"));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
log4j.rootCategory=WARN, stdout
|
||||
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss.SSS} %-5p [%t][%c] %m%n
|
||||
|
||||
log4j.category.org.springframework.integration=WARN
|
||||
log4j.category.org.springframework.integration.mqtt=INFO
|
||||
Reference in New Issue
Block a user