diff --git a/spring-integration-ip-extensions/README.md b/spring-integration-ip-extensions/README.md new file mode 100644 index 0000000..adfad04 --- /dev/null +++ b/spring-integration-ip-extensions/README.md @@ -0,0 +1,67 @@ +Spring Integration IP Extensions +================================================= + +Welcome to the Spring Integration IP Extensions project. It is intended to supplement the spring-integration-ip module with, for example, custom serializers/deserializers. + +# Building + +If you encounter out of memory errors during the build, increase available heap and permgen for Gradle: + + GRADLE_OPTS='-XX:MaxPermSize=1024m -Xmx1024m' + +To build and install jars into your local Maven cache: + + ./gradlew install + +To build api Javadoc (results will be in `build/api`): + + ./gradlew api + +To build complete distribution including `-dist` and `-docs` zip files (results will be in `build/distributions`) + + ./gradlew dist + +# IDE Support + +## Using SpringSource Tool Suite + + Gradle projects can be directly imported into STS + +## Using Plain Eclipse + +To generate Eclipse metadata (.classpath and .project files), do the following: + + ./gradlew eclipse + +Once complete, you may then import the projects into Eclipse as usual: + + *File -> Import -> Existing projects into workspace* + +Browse to the *'spring-integration'* root directory. All projects should import free of errors. + +## Using IntelliJ IDEA + +To generate IDEA metadata (.iml and .ipr files), do the following: + + ./gradlew idea + +For more information, please visit the Spring Integration website at: +[http://www.springsource.org/spring-integration](http://www.springsource.org/spring-integration) + +# WebSocket Server Demo + +This demonstrates how to use the TCP adapters to provide a very lightweight websocket server. + +Run WebSocketServerTests as a Java Application (main) and open + +`file:///.../spring-integration-extensions/spring-integration-ip-extensions/src/test/java/org/springframework/integration/ip/extensions/sockjs/ws.html` + +in a browser. Opening the page opens the WebSocket. + +Sending 'start' begins sending an incrementing # once per second. 'stop' stops the stream (leaving the socket open), 'start' resumes again. Multiple browser instances get their own sequence #. + +# Bitcoin Sample + +The [bitcoin-rt project](https://github.com/cbeams/bitcoin-rt) provides a sample using the Spring Integration IP extensions: + +[https://github.com/cbeams/bitcoin-rt/tree/master/java-spring-integration](https://github.com/cbeams/bitcoin-rt/tree/master/java-spring-integration) diff --git a/spring-integration-ip-extensions/build.gradle b/spring-integration-ip-extensions/build.gradle new file mode 100644 index 0000000..45864a7 --- /dev/null +++ b/spring-integration-ip-extensions/build.gradle @@ -0,0 +1,223 @@ +description = 'Spring Integration IP Extensions' + +buildscript { + repositories { + maven { url 'https://repo.springsource.org/plugins-snapshot' } + } + dependencies { + classpath 'org.springframework.build.gradle:docbook-reference-plugin:0.1.5' + } +} + +apply plugin: 'java' +apply from: "${rootProject.projectDir}/publish-maven.gradle" +apply plugin: 'eclipse' +apply plugin: 'idea' + +group = 'org.springintegration.ip.extensions' + +repositories { + maven { url 'http://repo.springsource.org/libs-milestone' } + maven { url 'http://repo.springsource.org/plugins-release' } // for bundlor +} + +sourceCompatibility=1.6 +targetCompatibility=1.6 + +ext { + aspectjVersion = '1.6.8' + cglibVersion = '2.2' + commonsNetVersion = '3.0.1' + javaxActivationVersion = '1.1.1' + junitVersion = '4.10' + log4jVersion = '1.2.12' + mockitoVersion = '1.9.0' + springVersion = '3.1.3.RELEASE' + springIntegrationVersion = '2.2.0.RELEASE' +} + +eclipse { + project { + natures += 'org.springframework.ide.eclipse.core.springnature' + } +} + +sourceSets { + test { + resources { + srcDirs = ['src/test/resources', 'src/test/java'] + } + } +} + +// See http://www.gradle.org/docs/current/userguide/dependency_management.html#sub:configurations +// and http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ConfigurationContainer.html +configurations { + jacoco //Configuration Group used by Sonar to provide Code Coverage using JaCoCo +} + +dependencies { + compile "org.springframework.integration:spring-integration-ip:$springIntegrationVersion" + compile "commons-codec:commons-codec:1.5" + testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion" + testCompile "junit:junit-dep:$junitVersion" + testCompile "log4j:log4j:$log4jVersion" + testCompile "org.mockito:mockito-all:$mockitoVersion" + testCompile "org.springframework:spring-test:$springVersion" + jacoco group: "org.jacoco", name: "org.jacoco.agent", version: "0.5.6.201201232323", classifier: "runtime" +} + +// enable all compiler warnings; individual projects may customize further +ext.xLintArg = '-Xlint:all' +[compileJava, compileTestJava]*.options*.compilerArgs = [xLintArg] + +test { + // suppress all console output during testing unless running `gradle -i` + logging.captureStandardOutput(LogLevel.INFO) + jvmArgs "-javaagent:${configurations.jacoco.asPath}=destfile=${buildDir}/jacoco.exec,includes=*" +} + +task sourcesJar(type: Jar) { + classifier = 'sources' + from sourceSets.main.allJava +} + +task javadocJar(type: Jar) { + classifier = 'javadoc' + from javadoc +} + +artifacts { + archives sourcesJar + archives javadocJar +} + +apply plugin: 'sonar' + +sonar { + + if (rootProject.hasProperty('sonarHostUrl')) { + server.url = rootProject.sonarHostUrl + } + + database { + if (rootProject.hasProperty('sonarJdbcUrl')) { + url = rootProject.sonarJdbcUrl + } + if (rootProject.hasProperty('sonarJdbcDriver')) { + driverClassName = rootProject.sonarJdbcDriver + } + if (rootProject.hasProperty('sonarJdbcUsername')) { + username = rootProject.sonarJdbcUsername + } + if (rootProject.hasProperty('sonarJdbcPassword')) { + password = rootProject.sonarJdbcPassword + } + } + + project { + dynamicAnalysis = "reuseReports" + withProjectProperties { props -> + props["sonar.core.codeCoveragePlugin"] = "jacoco" + props["sonar.jacoco.reportPath"] = "${buildDir.name}/jacoco.exec" + } + } + + logger.info("Sonar parameters used: server.url='${server.url}'; database.url='${database.url}'; database.driverClassName='${database.driverClassName}'; database.username='${database.username}'") +} + +task api(type: Javadoc) { + group = 'Documentation' + description = 'Generates aggregated Javadoc API documentation.' + title = "${rootProject.description} ${version} API" + options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED + options.author = true + options.header = rootProject.description + options.overview = 'src/api/overview.html' + + source = sourceSets.main.allJava + classpath = project.sourceSets.main.compileClasspath + destinationDir = new File(buildDir, "api") +} + +task docsZip(type: Zip) { + group = 'Distribution' + classifier = 'docs' + description = "Builds -${classifier} archive containing api and reference " + + "for deployment at static.springframework.org/spring-integration/docs." + + from (api) { + into 'api' + } + +} + +task distZip(type: Zip, dependsOn: [docsZip]) { + group = 'Distribution' + classifier = 'dist' + description = "Builds -${classifier} archive, containing all jars and docs, " + + "suitable for community download page." + + ext.baseDir = "${project.name}-${project.version}"; + + from('src/dist') { + include 'readme.txt' + include 'license.txt' + include 'notice.txt' + into "${baseDir}" + } + + from(zipTree(docsZip.archivePath)) { + into "${baseDir}/docs" + } + + into ("${baseDir}/libs") { + from project.jar + from project.sourcesJar + from project.javadocJar + } +} + +// Create an optional "with dependencies" distribution. +// Not published by default; only for use when building from source. +task depsZip(type: Zip, dependsOn: distZip) { zipTask -> + group = 'Distribution' + classifier = 'dist-with-deps' + description = "Builds -${classifier} archive, containing everything " + + "in the -${distZip.classifier} archive plus all dependencies." + + from zipTree(distZip.archivePath) + + gradle.taskGraph.whenReady { taskGraph -> + if (taskGraph.hasTask(":${zipTask.name}")) { + def projectName = rootProject.name + def artifacts = new HashSet() + + rootProject.configurations.runtime.resolvedConfiguration.resolvedArtifacts.each { artifact -> + def dependency = artifact.moduleVersion.id + if (!projectName.equals(dependency.name)) { + artifacts << artifact.file + } + } + + zipTask.from(artifacts) { + into "${distZip.baseDir}/deps" + } + } + } +} + +artifacts { + archives distZip + archives docsZip +} + +task dist(dependsOn: assemble) { + group = 'Distribution' + description = 'Builds -dist, -docs and -schema distribution archives.' +} + +task wrapper(type: Wrapper) { + description = 'Generates gradlew[.bat] scripts' + gradleVersion = '1.3' +} diff --git a/spring-integration-ip-extensions/gradle.properties b/spring-integration-ip-extensions/gradle.properties new file mode 100644 index 0000000..99f6735 --- /dev/null +++ b/spring-integration-ip-extensions/gradle.properties @@ -0,0 +1 @@ +version=0.1.0.BUILD-SNAPSHOT diff --git a/spring-integration-ip-extensions/gradle/wrapper/gradle-wrapper.jar b/spring-integration-ip-extensions/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..7b359d7 Binary files /dev/null and b/spring-integration-ip-extensions/gradle/wrapper/gradle-wrapper.jar differ diff --git a/spring-integration-ip-extensions/gradle/wrapper/gradle-wrapper.properties b/spring-integration-ip-extensions/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..b2ebed1 --- /dev/null +++ b/spring-integration-ip-extensions/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Mon Jan 07 15:58:18 EST 2013 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=http\://services.gradle.org/distributions/gradle-1.3-bin.zip diff --git a/spring-integration-ip-extensions/gradlew b/spring-integration-ip-extensions/gradlew new file mode 100755 index 0000000..3851082 --- /dev/null +++ b/spring-integration-ip-extensions/gradlew @@ -0,0 +1,164 @@ +#!/usr/bin/env bash + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn ( ) { + echo "$*" +} + +die ( ) { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; +esac + +# For Cygwin, ensure paths are in UNIX format before anything is touched. +if $cygwin ; then + [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` +fi + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" +APP_HOME="`pwd -P`" +cd "$SAVED" + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules +function splitJvmOpts() { + JVM_OPTS=("$@") +} +eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS +JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" + +exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" diff --git a/spring-integration-ip-extensions/gradlew.bat b/spring-integration-ip-extensions/gradlew.bat new file mode 100644 index 0000000..aec9973 --- /dev/null +++ b/spring-integration-ip-extensions/gradlew.bat @@ -0,0 +1,90 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windowz variants + +if not "%OS%" == "Windows_NT" goto win9xME_args +if "%@eval[2+2]" == "4" goto 4NT_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* +goto execute + +:4NT_args +@rem Get arguments from the 4NT Shell from JP Software +set CMD_LINE_ARGS=%$ + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/spring-integration-ip-extensions/publish-maven.gradle b/spring-integration-ip-extensions/publish-maven.gradle new file mode 100644 index 0000000..b2374b3 --- /dev/null +++ b/spring-integration-ip-extensions/publish-maven.gradle @@ -0,0 +1,61 @@ +apply plugin: 'maven' + +ext.optionalDeps = [] +ext.providedDeps = [] + +ext.optional = { optionalDeps << it } +ext.provided = { providedDeps << it } + +install { + repositories.mavenInstaller { + customizePom(pom, project) + } +} + +def customizePom(pom, gradleProject) { + pom.whenConfigured { generatedPom -> + // respect 'optional' and 'provided' dependencies + gradleProject.optionalDeps.each { dep -> + generatedPom.dependencies.find { it.artifactId == dep.name }?.optional = true + } + gradleProject.providedDeps.each { dep -> + generatedPom.dependencies.find { it.artifactId == dep.name }?.scope = 'provided' + } + + // eliminate test-scoped dependencies (no need in maven central poms) + generatedPom.dependencies.removeAll { dep -> + dep.scope == 'test' + } + + // add all items necessary for maven central publication + generatedPom.project { + name = gradleProject.description + description = gradleProject.description + url = 'https://github.com/SpringSource/spring-integration' + organization { + name = 'SpringSource' + url = 'http://springsource.org' + } + licenses { + license { + name 'The Apache Software License, Version 2.0' + url 'http://www.apache.org/licenses/LICENSE-2.0.txt' + distribution 'repo' + } + } + scm { + url = 'https://github.com/SpringSource/spring-integration' + connection = 'scm:git:git://github.com/SpringSource/spring-integration' + developerConnection = 'scm:git:git://github.com/SpringSource/spring-integration' + } + + developers { + developer { + id = 'garyrussell' + name = 'Gary Russell' + email = 'grussell@vmware.com' + } + } + } + } +} diff --git a/spring-integration-ip-extensions/src/api/overview.html b/spring-integration-ip-extensions/src/api/overview.html new file mode 100644 index 0000000..314cf20 --- /dev/null +++ b/spring-integration-ip-extensions/src/api/overview.html @@ -0,0 +1,13 @@ + + + This document is the API specification for Spring Integration +
+
+

+ Spring Integration IP Extensions project is intended to supplement + the spring-integration-ip module with, for example, custom + serializers/deserializers. +

+
+ + diff --git a/spring-integration-ip-extensions/src/dist/license.txt b/spring-integration-ip-extensions/src/dist/license.txt new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/spring-integration-ip-extensions/src/dist/license.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/spring-integration-ip-extensions/src/dist/notice.txt b/spring-integration-ip-extensions/src/dist/notice.txt new file mode 100644 index 0000000..f62045a --- /dev/null +++ b/spring-integration-ip-extensions/src/dist/notice.txt @@ -0,0 +1,21 @@ + ======================================================================== + == NOTICE file corresponding to section 4 d of the Apache License, == + == Version 2.0, in this case for the Spring Integration distribution. == + ======================================================================== + + This product includes software developed by + the Apache Software Foundation (http://www.apache.org). + + The end-user documentation included with a redistribution, if any, + must include the following acknowledgement: + + "This product includes software developed by the Spring Framework + Project (http://www.springframework.org)." + + Alternatively, this acknowledgement may appear in the software itself, + if and wherever such third-party acknowledgements normally appear. + + The names "Spring", "Spring Framework", and "Spring Integration" must + not be used to endorse or promote products derived from this software + without prior written permission. For written permission, please contact + enquiries@springsource.com. diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/package-info.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/package-info.java new file mode 100644 index 0000000..aba45f0 --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/package-info.java @@ -0,0 +1,4 @@ +/** + * Root package of the IP Extensions. + */ +package org.springframework.integration.x.ip; diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractByteArraySerializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractByteArraySerializer.java new file mode 100644 index 0000000..21a9c84 --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractByteArraySerializer.java @@ -0,0 +1,85 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.x.ip.serializer; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.core.serializer.Deserializer; +import org.springframework.core.serializer.Serializer; + +/** + * Base class for (de)serializers that provide a mechanism to + * reconstruct a byte array from an arbitrary stream. + * + * TODO: Enhanced version of standard class - will be merged in 3.0. + * + * @author Gary Russell + * @since 2.0 + * + */ +public abstract class AbstractByteArraySerializer implements + Serializer, + Deserializer { + + protected int maxMessageSize = 2048; + + protected final Log logger = LogFactory.getLog(this.getClass()); + + /** + * The maximum supported message size for this serializer. + * Default 2048. + * @return The max message size. + */ + public int getMaxMessageSize() { + return maxMessageSize; + } + + /** + * The maximum supported message size for this serializer. + * Default 2048. + * @param maxMessageSize The max message size. + */ + public void setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + protected void checkClosure(int bite) throws IOException { + if (bite < 0) { + logger.debug("Socket closed during message assembly"); + throw new IOException("Socket closed during message assembly"); + } + } + + /** + * Copy size bytes to a new buffer exactly size bytes long. + * @param buffer The buffer containing the data. + * @param size The number of bytes to copy. + * @return The new buffer, or the buffer parameter if it is + * already the correct size. + */ + protected byte[] copyToSizedArray(byte[] buffer, int size) { + if (size == buffer.length) { + return buffer; + } + byte[] assembledData = new byte[size]; + System.arraycopy(buffer, 0, assembledData, 0, size); + return assembledData; + } + +} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractHttpSwitchingDeserializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractHttpSwitchingDeserializer.java new file mode 100644 index 0000000..ddd40cc --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractHttpSwitchingDeserializer.java @@ -0,0 +1,146 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.serializer; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Base class for (de)Serializers that start with an HTTP-like protocol then + * switch to some other protocol. + * + * @author Gary Russell + * @since 3.0 + * + */ +public abstract class AbstractHttpSwitchingDeserializer implements StatefulDeserializer { + + protected final Log logger = LogFactory.getLog(this.getClass()); + + protected volatile int maxMessageSize = 2048; + + private final Map streamState = new ConcurrentHashMap(); + + protected final ByteArrayCrLfSerializer crlfDeserializer = new ByteArrayCrLfSerializer(); + + public void setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + protected ByteArrayCrLfSerializer getCrlfDeserializer() { + return crlfDeserializer; + } + + public abstract DataFrame deserialize(InputStream inputStream) throws IOException; + + protected BasicState getStreamState(InputStream inputStream) { + return streamState.get(inputStream); + } + + /** + * Returns null if we've switched from HTTP-like protocol; headers otherwise. + * @param inputStream + * @return null or list of DataFrame, where the first frame contains the headers. + * Implementations may add additional frames. + * @throws IOException + */ + protected List checkStreaming(InputStream inputStream) throws IOException { + BasicState isStreaming = this.streamState.get(inputStream); + if (isStreaming == null) { //Consume the headers - TODO - check status + StringBuilder headersBuilder = new StringBuilder(); + byte[] headers = new byte[this.maxMessageSize]; + int headersLength; + do { + headersLength = this.crlfDeserializer.fillToCrLf(inputStream, headers); + String header = new String(headers, 0, headersLength, "UTF-8"); + headersBuilder.append(header).append("\r\n"); + } + while (headersLength > 0); + BasicState basicState = createState(); + List dataList = new ArrayList(); + List decodedHeaders = decodeHeaders(headersBuilder.toString(), basicState, dataList); + this.streamState.put(inputStream, basicState); + return decodedHeaders; + } + return null; + } + + protected BasicState createState() { + return new BasicState(); + } + + protected void checkClosure(int bite) throws IOException { + if (bite < 0) { + logger.debug("Socket closed during message assembly"); + throw new IOException("Socket closed during message assembly"); + } + } + + protected List decodeHeaders(String frameData, BasicState state, List dataList) { + // TODO: Full header separation - mvc utils? + if (logger.isDebugEnabled()) { + logger.debug("Received:Headers\r\n" + frameData); + } + dataList.add(createDataFrame(DataFrame.TYPE_HEADERS, frameData)); + return dataList; + } + + protected DataFrame createDataFrame(int type, String frameData) { + return new DataFrame(type, frameData); + } + + public void removeState(Object key) { + this.streamState.remove(key); + } + + public BasicState getState(Object key) { + return this.streamState.get(key); + } + + public static class BasicState { + + private volatile DataFrame pendingFrame; + + private final List fragments = new ArrayList(); + + public DataFrame getPendingFrame() { + return pendingFrame; + } + + public void setPendingFrame(DataFrame pendingFrame) { + this.pendingFrame = pendingFrame; + } + + public List getFragments() { + return fragments; + } + + @Override + public String toString() { + return "BasicState [pendingFrame=" + pendingFrame + ", fragments.size()=" + fragments.size() + "]"; + } + + + } + +} \ No newline at end of file diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/ByteArrayCrLfSerializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/ByteArrayCrLfSerializer.java new file mode 100644 index 0000000..8bb831e --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/ByteArrayCrLfSerializer.java @@ -0,0 +1,87 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.x.ip.serializer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException; + +/** + * Reads data in an InputStream to a byte[]; data must be terminated by \r\n + * (not included in resulting byte[]). + * Writes a byte[] to an OutputStream and adds \r\n. + * + * TODO: Enhanced version of standard class - will be merged in 3.0. + * + * @author Gary Russell + * @since 2.0 + */ +public class ByteArrayCrLfSerializer extends AbstractByteArraySerializer { + + private static final byte[] CRLF = "\r\n".getBytes(); + + /** + * Reads the data in the inputstream to a byte[]. Data must be terminated + * by CRLF (\r\n). Throws a {@link SoftEndOfStreamException} if the stream + * is closed immediately after the \r\n (i.e. no data is in the process of + * being read). + */ + public byte[] deserialize(InputStream inputStream) throws IOException { + byte[] buffer = new byte[this.maxMessageSize]; + int n = this.fillToCrLf(inputStream, buffer); + byte[] assembledData = this.copyToSizedArray(buffer, n); + return assembledData; + } + + public int fillToCrLf(InputStream inputStream, byte[] buffer) + throws IOException, SoftEndOfStreamException { + int n = 0; + int bite; + if (logger.isDebugEnabled()) { + logger.debug("Available to read:" + inputStream.available()); + } + while (true) { + bite = inputStream.read(); +// logger.debug("Read:" + (char) bite); + if (bite < 0 && n == 0) { + throw new SoftEndOfStreamException("Stream closed between payloads"); + } + checkClosure(bite); + if (n > 0 && bite == '\n' && buffer[n-1] == '\r') { + break; + } + buffer[n++] = (byte) bite; + if (n >= this.maxMessageSize) { + throw new IOException("CRLF not found before max message length: " + + this.maxMessageSize); + } + }; + return n-1; // trim \r + } + + /** + * Writes the byte[] to the stream and appends \r\n. + */ + public void serialize(byte[] bytes, OutputStream outputStream) throws IOException { + outputStream.write(bytes); + outputStream.write(CRLF); + outputStream.flush(); + } + +} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/DataFrame.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/DataFrame.java new file mode 100644 index 0000000..da1499a --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/DataFrame.java @@ -0,0 +1,65 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.serializer; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public class DataFrame { + + public static final int TYPE_INVALID = 0; + + public static final int TYPE_HEADERS = 1; + + public static final int TYPE_DATA = 4; + + public static final int TYPE_DATA_BINARY = 260; + + protected final int type; + + protected final String payload; + + protected final byte[] binary; + + public DataFrame(int type, String payload) { + this(type, payload, null); + } + + public DataFrame(int type, byte[] binary) { + this(type, null, binary); + } + + public DataFrame(int type, String payload, byte[] binary) { + this.type = type; + this.payload = payload; + this.binary = binary; + } + + public int getType() { + return this.type; + } + + public String getPayload() { + return this.payload; + } + + public byte[] getBinary() { + return binary; + } + +} \ No newline at end of file diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/StatefulDeserializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/StatefulDeserializer.java new file mode 100644 index 0000000..c97f79f --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/StatefulDeserializer.java @@ -0,0 +1,28 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.serializer; + +import org.springframework.core.serializer.Deserializer; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public interface StatefulDeserializer extends Deserializer { + + void removeState(Object key); +} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketFrame.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketFrame.java new file mode 100644 index 0000000..75d1175 --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketFrame.java @@ -0,0 +1,98 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.websocket; + +import org.springframework.integration.x.ip.serializer.DataFrame; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public class WebSocketFrame extends DataFrame { + + public static final int TYPE_FRAGMENTED_CONTROL = 256; + + public static final int TYPE_INVALID_UTF8 = 512; + + public static final int TYPE_PING = 5; + + public static final int TYPE_PONG = 6; + + public static final int TYPE_OPEN = 7; + + public static final int TYPE_CLOSE = 8; + + private static final String[] typeToString = new String[] { + "Invalid", "Headers", "**", "**", "Data", "Ping", "Pong", "Open", "Close" + }; + + private volatile short status = -1; + + private volatile int rsv; + + public WebSocketFrame(int type, String payload) { + super(type, payload); + } + + public WebSocketFrame(int type, byte[] binary) { + super(type, binary); + } + + public WebSocketFrame(int type, String payload, byte[] binary) { + super(type, payload, binary); + } + + public short getStatus() { + return status; + } + + public void setStatus(short status) { + this.status = status; + } + + public void setRsv(int rsv) { + this.rsv = rsv; + } + + public int getRsv() { + return rsv; + } + + @Override + public String toString() { + int len = 0; + boolean trunc = false; + if (this.payload!= null) { + len = Math.min(100, payload.length()); + trunc = len < payload.length(); + } + String typeAsString; + if ((type & 0xff) < typeToString.length) { + typeAsString = typeToString[type & 0xff]; + } + else { + typeAsString = Integer.toString(type); + } + return "WebSocketFrame [type=" + typeAsString + "(" + + type + ")"+ (payload == null ? "" : ", payload=" + payload.substring(0, len) + + (trunc ? "..." : "")) + + ", binary=" + binary + + (binary != null ? ", binary.length=" + binary.length : "") + + ", status=" + status + ", rsv=" + rsv + "]"; + } + +} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java new file mode 100644 index 0000000..365d3d7 --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java @@ -0,0 +1,509 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.websocket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.codec.binary.Base64; +import org.springframework.core.serializer.Serializer; +import org.springframework.integration.MessagingException; +import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException; +import org.springframework.integration.x.ip.serializer.AbstractHttpSwitchingDeserializer; +import org.springframework.integration.x.ip.serializer.DataFrame; +import org.springframework.util.Assert; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public class WebSocketSerializer extends AbstractHttpSwitchingDeserializer implements Serializer { + + private static final String HTTP_1_1_101_WEB_SOCKET_PROTOCOL_HANDSHAKE_SPRING_INTEGRATION = + "HTTP/1.1 101 Web Socket Protocol Handshake - Spring Integration\r\n"; + + private static final Set INVALID_STATUS = new HashSet( + Arrays.asList((short) 1004, (short) 1005, (short) 1006, (short) 1012, (short) 1013, (short) 1014, (short) 1015)); + + private volatile boolean server; + + private boolean validateUtf8; + + public void setServer(boolean server) { + this.server = server; + } + + /** + * Validate UTF-8 (required for Autobahn tests). + * @param validateUtf8 + */ + public void setValidateUtf8(boolean validateUtf8) { + this.validateUtf8 = validateUtf8; + } + + @Override + protected DataFrame createDataFrame(int type, String frameData) { + return new WebSocketFrame(type, frameData); + } + + @Override + protected BasicState createState() { + return new WebSocketState(); + } + + public void serialize(final Object frame, OutputStream outputStream) + throws IOException { + String data = ""; + WebSocketFrame theFrame = null; + if (frame instanceof String) { + data = (String) frame; + theFrame = new WebSocketFrame(WebSocketFrame.TYPE_DATA, data); + } + else if (frame instanceof WebSocketFrame) { + theFrame = (WebSocketFrame) frame; + data = theFrame.getPayload(); + } + if (data != null && data.startsWith("HTTP/1.1")) { + outputStream.write(data.getBytes()); + return; + } + int lenBytes; + int payloadLen = this.server ? 0 : 0x80; //masked + boolean close = theFrame.getType() == WebSocketFrame.TYPE_CLOSE; + boolean pong = theFrame.getType() == WebSocketFrame.TYPE_PONG; + byte[] bytes = theFrame.getBinary() != null ? theFrame.getBinary() : data.getBytes("UTF-8"); + + int length = bytes.length; + if (close) { + length += 2; + } + if (length >= Math.pow(2, 16)) { + lenBytes = 8; + payloadLen |= 127; + } + else if (length > 125) { + lenBytes = 2; + payloadLen |= 126; + } + else { + lenBytes = 0; + payloadLen |= length; + } + int mask = (int) System.currentTimeMillis(); + ByteBuffer buffer = ByteBuffer.allocate(length + 6 + lenBytes); + if (pong) { + buffer.put((byte) 0x8a); + } + else if (close) { + buffer.put((byte) 0x88); + } + else if (theFrame.getType() == WebSocketFrame.TYPE_DATA_BINARY) { + buffer.put((byte) 0x82); + } + else { + // Final fragment; text + buffer.put((byte) 0x81); + } + buffer.put((byte) payloadLen); + if (lenBytes == 2) { + buffer.putShort((short) length); + } + else if (lenBytes == 8) { + buffer.putLong(length); + } + + byte[] maskBytes = new byte[4]; + if (!server) { + buffer.putInt(mask); + buffer.position(buffer.position() - 4); + buffer.get(maskBytes); + } + if (close) { + buffer.putShort(theFrame.getStatus()); + // TODO: mask status when client + } + for (int i = 0; i < bytes.length; i++) { + if (server) { + buffer.put(bytes[i]); + } + else { + buffer.put((byte) (bytes[i] ^ maskBytes[i % 4])); + } + } + outputStream.write(buffer.array(), 0, buffer.position()); + } + + @Override + public DataFrame deserialize(InputStream inputStream) throws IOException { + DataFrame frame = null; + BasicState state = this.getState(inputStream); + if (state != null) { + frame = state.getPendingFrame(); + } + while (frame == null || (frame.getPayload() == null && frame.getBinary() == null)) { + frame = doDeserialize(inputStream, frame); + if (frame.getPayload() == null && frame.getBinary() == null) { + state.setPendingFrame(frame); + } + } + return frame; + } + + private DataFrame doDeserialize(InputStream inputStream, DataFrame protoFrame) throws IOException { + List headers = checkStreaming(inputStream); + if (headers != null) { + return headers.get(0); + } + int bite; + if (logger.isDebugEnabled()) { + logger.debug("Available to read:" + inputStream.available()); + } + boolean done = false; + int len = 0; + int n = 0; + int dataInx = 0; + byte[] buffer = null; + boolean fin = false; + boolean ping = false; + boolean pong = false; + boolean close = false; + boolean binary = false; + boolean invalid = false; + String invalidText = null; + boolean fragmentedControl = false; + int lenBytes = 0; + byte[] mask = new byte[4]; + int maskInx = 0; + int rsv = 0; + while (!done ) { + bite = inputStream.read(); +// logger.debug("Read:" + Integer.toHexString(bite)); + if (bite < 0 && n == 0) { + throw new SoftEndOfStreamException("Stream closed between payloads"); + } + checkClosure(bite); + switch (n++) { + case 0: + fin = (bite & 0x80) > 0; + rsv = (bite & 0x70) >> 4; + bite &= 0x0f; + switch (bite) { + case 0x00: + logger.debug("Continuation, fin=" + fin); + if (protoFrame == null) { + invalid = true; + invalidText = "Unexpected continuation frame"; + } + else { + binary = protoFrame.getType() == WebSocketFrame.TYPE_DATA_BINARY; + } + this.getState(inputStream).setPendingFrame(null); + break; + case 0x01: + logger.debug("Text, fin=" + fin); + if (protoFrame != null) { + invalid = true; + invalidText = "Expected continuation frame"; + } + break; + case 0x02: + logger.debug("Binary, fin=" + fin); + if (protoFrame != null) { + invalid = true; + invalidText = "Expected continuation frame"; + } + binary = true; + break; + case 0x08: + logger.debug("Close, fin=" + fin); + fragmentedControl = !fin; + close = true; + break; + case 0x09: + ping = true; + binary = true; + fragmentedControl = !fin; + logger.debug("Ping, fin=" + fin); + break; + case 0x0a: + pong = true; + fragmentedControl = !fin; + logger.debug("Pong, fin=" + fin); + break; + case 0x03: + case 0x04: + case 0x05: + case 0x06: + case 0x07: + case 0x0b: + case 0x0c: + case 0x0d: + case 0x0e: + case 0x0f: + invalid = true; + invalidText = "Reserved opcode " + Integer.toHexString(bite); + break; + default: + throw new IOException("Unexpected opcode " + Integer.toHexString(bite)); + } + break; + case 1: + if (this.server) { + if ((bite & 0x80) == 0) { + throw new IOException("Illegal: Expected masked data from client"); + } + bite &= 0x7f; + } + if ((bite & 0x80) > 0) { + throw new IOException("Illegal: Received masked data from server"); + } + if (bite < 126) { + len = bite; + buffer = new byte[len]; + } + else if (bite == 126) { + lenBytes = 2; + } + else { + lenBytes = 8; + } + break; + case 2: + case 3: + case 4: + case 5: + if (lenBytes > 4 && bite != 0) { + throw new IOException("Max supported length exceeded"); + } + case 6: + if (lenBytes > 3 && (bite & 0x80) > 0) { + throw new IOException("Max supported length exceeded"); + } + case 7: + case 8: + case 9: + if (lenBytes-- > 0) { + len = len << 8 | (bite & 0xff); + if (lenBytes == 0) { + buffer = new byte[len]; + } + break; + } + default: + if (this.server && maskInx < 4) { + mask[maskInx++] = (byte) bite; + } + else { + if (this.server) { + bite ^= mask[dataInx % 4]; + } + buffer[dataInx++] = (byte) bite; + } + done = (server ? maskInx == 4 : true) && dataInx >= len; + } + }; + + WebSocketFrame frame; + + if (fragmentedControl) { + frame = new WebSocketFrame(WebSocketFrame.TYPE_FRAGMENTED_CONTROL, "Fragmented control frame", buffer); + } + else if (invalid) { + frame = new WebSocketFrame(WebSocketFrame.TYPE_INVALID, invalidText, buffer); + } + else if (!fin) { + List fragments = this.getState(inputStream).getFragments(); + fragments.add(buffer); + logger.debug("Fragment"); + return new WebSocketFrame(binary ? WebSocketFrame.TYPE_DATA_BINARY : WebSocketFrame.TYPE_DATA, (String) null); + } + else if (ping) { + frame = new WebSocketFrame(WebSocketFrame.TYPE_PING, buffer); + } + else if (pong) { + String data = new String(buffer, "UTF-8"); + frame = new WebSocketFrame(WebSocketFrame.TYPE_PONG, data); + } + else if (close) { + String data = new String(buffer, "UTF-8"); + if (data.length() >= 2) { + data = data.substring(2); + } + WebSocketFrame closeFrame = new WebSocketFrame(WebSocketFrame.TYPE_CLOSE, data); + short status = 1000; + if (buffer.length >= 2) { + status = (short) ((buffer[0] << 8) | (buffer[1] & 0xff)); + closeFrame.setStatus(status); + } + if (buffer.length == 1 || buffer.length > 125 || + (buffer.length > 2 && !validateUtf8IfNecessary(buffer, 2, data)) || + status < 1000 || INVALID_STATUS.contains(status) || (status >= 1016 && status < 3000) || status >= 5000) { + // Simply close in this case; no close reply + ((WebSocketState) this.getState(inputStream)).setCloseInitiated(true); + } + frame = closeFrame; + } + else { + List fragments = this.getState(inputStream).getFragments(); + if (fragments.size() == 0) { + if (binary) { + frame = new WebSocketFrame(WebSocketFrame.TYPE_DATA_BINARY, buffer); + } + else { + String data = new String(buffer, "UTF-8"); + if (!validateUtf8IfNecessary(buffer, 0, data)) { + frame = new WebSocketFrame(WebSocketFrame.TYPE_INVALID_UTF8, "Invalid UTF-8", buffer); + } + else { + frame = new WebSocketFrame(WebSocketFrame.TYPE_DATA, data); + } + } + } + else { + fragments.add(buffer); + int utf8Len = 0; + for (byte[] fragment : fragments) { + utf8Len += fragment.length; + } + byte[] reconstructed = new byte[utf8Len]; + int utf8Pos = 0; + for (byte[] fragment : fragments) { + System.arraycopy(fragment, 0, reconstructed, utf8Pos, fragment.length); + utf8Pos += fragment.length; + } + fragments.clear(); + if (binary) { + frame = new WebSocketFrame(WebSocketFrame.TYPE_DATA_BINARY, reconstructed); + } + else { + String data = new String(reconstructed, "UTF-8"); + if (!validateUtf8IfNecessary(reconstructed, 0, data)) { + frame = new WebSocketFrame(WebSocketFrame.TYPE_INVALID_UTF8, "Invalid UTF-8", reconstructed); + } + else { + frame = new WebSocketFrame(WebSocketFrame.TYPE_DATA, data); + } + } + } + } + if (rsv > 0) { + frame.setRsv(rsv); + } + return frame; + } + + private boolean validateUtf8IfNecessary(byte[] buffer, int offset, String data) { + if (this.validateUtf8) { + try { + byte[] bytes = data.getBytes("UTF-8"); + if (bytes.length != buffer.length - offset) { + return false; + } + for (int i = 0; i < bytes.length; i++) { + if (buffer[i + offset] != bytes[i]) { + return false; + } + } + } + catch (UnsupportedEncodingException e) { + throw new MessagingException("UTF-8 Conversion error"); + } + } + return true; + } + + @Override + protected void checkClosure(int bite) throws IOException { + if (bite < 0) { + logger.debug("Socket closed during message assembly"); + throw new IOException("Socket closed during message assembly"); + } + } + + @Override + public void removeState(Object inputStream) { + super.removeState(inputStream); + } + + public WebSocketFrame generateHandshake(WebSocketFrame frame) throws Exception { + Assert.isTrue(frame.getType() == WebSocketFrame.TYPE_HEADERS, "Expected headers:" + frame); + String[] headers = frame.getPayload().split("\\r\\n"); + String key = null; + String version = null; + for (String header : headers) { + if (header.toLowerCase().startsWith("sec-websocket-key")) { + key = header.split(":")[1].trim(); + } + else if (header.toLowerCase().startsWith("sec-websocket-version")) { + version = header.split(":")[1].trim(); + } + } + if (key == null) { + throw new WebSocketUpgradeException("400 Bad Request: No sec-websocket-key header detected"); + } + else if (!"13".equals(version)) { + throw new WebSocketUpgradeException("426 Upgrade Required", "sec-websocket-version: 13\r\n"); + } + String handshake = HTTP_1_1_101_WEB_SOCKET_PROTOCOL_HANDSHAKE_SPRING_INTEGRATION + + "Upgrade: WebSocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Accept: " + this.generateWebSocketAccept(key) + "\r\n\r\n"; + return new WebSocketFrame(WebSocketFrame.TYPE_DATA, handshake); + } + + private String generateWebSocketAccept(String key) throws NoSuchAlgorithmException { + MessageDigest md = MessageDigest.getInstance("SHA-1"); + String toDigest = key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + byte[] acceptStringBytes = md.digest(toDigest.getBytes()); + acceptStringBytes = Base64.encodeBase64(acceptStringBytes); + String acceptString = new String(acceptStringBytes); + return acceptString; + } + + public static class WebSocketState extends BasicState { + + private volatile boolean closeInitiated; + + private volatile boolean expectingPong; + + public boolean isCloseInitiated() { + return this.closeInitiated; + } + + public void setCloseInitiated(boolean closeInitiated) { + this.closeInitiated = closeInitiated; + } + + public boolean isExpectingPong() { + return this.expectingPong; + } + + public void setExpectingPong(boolean expectingPong) { + this.expectingPong = expectingPong; + } + + } +} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java new file mode 100644 index 0000000..1d4f1ff --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java @@ -0,0 +1,282 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.websocket; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.core.serializer.Deserializer; +import org.springframework.integration.Message; +import org.springframework.integration.MessageHandlingException; +import org.springframework.integration.MessageHeaders; +import org.springframework.integration.MessagingException; +import org.springframework.integration.aggregator.ResequencingMessageGroupProcessor; +import org.springframework.integration.aggregator.ResequencingMessageHandler; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageHandler; +import org.springframework.integration.endpoint.EventDrivenConsumer; +import org.springframework.integration.ip.tcp.connection.AbstractTcpConnectionInterceptor; +import org.springframework.integration.ip.tcp.connection.TcpConnection; +import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptor; +import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory; +import org.springframework.integration.ip.tcp.connection.TcpNetConnection; +import org.springframework.integration.ip.tcp.connection.TcpNioConnection; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.x.ip.websocket.WebSocketSerializer.WebSocketState; +import org.springframework.util.Assert; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionInterceptorFactory { + + private static final Log logger = LogFactory.getLog(WebSocketTcpConnectionInterceptor.class); + + @Override + public TcpConnectionInterceptor getInterceptor() { + return new WebSocketTcpConnectionInterceptor(); + } + + private class WebSocketTcpConnectionInterceptor extends AbstractTcpConnectionInterceptor { + + private volatile boolean shook; + + private volatile InputStream theInputStream; + + private final DirectChannel resequenceChannel = new DirectChannel(); + + private final EventDrivenConsumer resequencer; + + public WebSocketTcpConnectionInterceptor() { + super(); + ResequencingMessageHandler handler = new ResequencingMessageHandler(new ResequencingMessageGroupProcessor()); + handler.setReleasePartialSequences(true); + DirectChannel resequenced = new DirectChannel(); + resequenced.setBeanName("resequencedWSFrames"); + handler.setOutputChannel(resequenced); + this.resequencer = new EventDrivenConsumer(this.resequenceChannel, handler); + resequenced.subscribe(new MessageHandler() { + + @Override + public void handleMessage(Message message) throws MessagingException { + doOnMessage(message); + } + }); + this.resequencer.afterPropertiesSet(); + this.resequencer.start(); + } + + /** + * When using NIO, we have to resequence the messages because frames may + * arrive out of order. This is particularly an issue for some of the + * Autobahn tests where, for example, many pings are sent and the test + * expects the pongs to come back in the same order. + */ + @Override + public boolean onMessage(Message message) { + if (this.getTheConnection() instanceof TcpNioConnection && message.getHeaders().getCorrelationId() != null) { + resequenceChannel.send(message); + return true; + } + else { + return this.doOnMessage(message); + } + } + + public boolean doOnMessage(Message message) { + Assert.isInstanceOf(WebSocketFrame.class, message.getPayload()); + WebSocketFrame payload = (WebSocketFrame) message.getPayload(); + InputStream inputStream = null; + try { + inputStream = this.getTheInputStream(); + } + catch (IOException e1) { + this.protocolViolation(message); + } + + WebSocketState state = (WebSocketState) this.getRequiredDeserializer().getState(inputStream); + Assert.notNull(state, "State must not be null:" + message); + if (logger.isDebugEnabled()) { + logger.debug(state); + } + if (payload.getRsv() > 0) { + if (logger.isDebugEnabled()) { + logger.debug("Reserved bits:" + payload.getRsv()); + } + this.protocolViolation(message); + } + else if (payload.getType() == WebSocketFrame.TYPE_CLOSE) { + try { + if (logger.isDebugEnabled()) { + logger.debug("Close, status:" + payload.getStatus()); + } + // If we initiated the close, just close. + if (!state.isCloseInitiated()) { + if (payload.getStatus() < 0) { + payload.setStatus((short) 1000); + } + this.send(message); + } + this.close(); + } + catch (Exception e) { + throw new MessageHandlingException(message, "Send failed", e); + } + } + else if (state == null || state.isCloseInitiated()) { + if (logger.isWarnEnabled()) { + logger.warn("Message dropped - close initiated:" + message); + } + } + else if ((payload.getType() & 0xff) == WebSocketFrame.TYPE_INVALID) { + if (logger.isDebugEnabled()) { + logger.debug("Invalid:" + payload.getPayload()); + } + this.protocolViolation(message); + } + else if (payload.getType() == WebSocketFrame.TYPE_FRAGMENTED_CONTROL) { + if (logger.isDebugEnabled()) { + logger.debug("Fragmented Control Op"); + } + this.protocolViolation(message); + } + else if (payload.getType() == WebSocketFrame.TYPE_PING) { + try { + if (logger.isDebugEnabled()) { + logger.debug("Ping:" + new String(payload.getBinary(), "UTF-8")); + } + if (payload.getBinary().length > 125) { + this.protocolViolation(message); + } + else { + WebSocketFrame pong = new WebSocketFrame(WebSocketFrame.TYPE_PONG, payload.getBinary()); + this.send(MessageBuilder.withPayload(pong) + .copyHeaders(message.getHeaders()) + .build()); + } + } + catch (Exception e) { + throw new MessageHandlingException(message, "Send failed", e); + } + } + else if (payload.getType() == WebSocketFrame.TYPE_PONG) { + if (logger.isDebugEnabled()) { + logger.debug("Pong"); + } + } + else if (this.shook) { + return super.onMessage(message); + } + else { + try { + doHandshake(payload, message.getHeaders()); + this.shook = true; + } + catch (Exception e) { + throw new MessageHandlingException(message, "Handshake failed", e); + } + } + return true; + } + + private void protocolViolation(Message message) { + if (logger.isDebugEnabled()) { + logger.debug("Protocol violation - closing; " + message); + } + WebSocketFrame frame = (WebSocketFrame) message.getPayload(); + String error = "Protocol Error" + frame.getPayload() == null ? "" : (":" + frame.getPayload()); + WebSocketFrame close = new WebSocketFrame(WebSocketFrame.TYPE_CLOSE, error); + close.setStatus(frame.getType() == WebSocketFrame.TYPE_INVALID_UTF8 ? (short) 1007 : (short) 1002); + try { + ((WebSocketState) this.getRequiredDeserializer().getState(this.getTheInputStream())).setCloseInitiated(true); + this.send(MessageBuilder.withPayload(close) + .copyHeaders(message.getHeaders()) + .build()); + } + catch (Exception e) { + throw new MessageHandlingException(message, "Send failed", e); } + } + + @Override + public void close() { + try { + InputStream inputStream = getTheInputStream(); + if (inputStream != null) { + this.getRequiredDeserializer().removeState(inputStream); + } + } + catch (IOException e) { + } + super.close(); + } + + /** + * Hack - need to add getInputStream() to TcpConnection. + * @return + * @throws IOException + */ + private InputStream getTheInputStream() throws IOException { + if (this.theInputStream != null) { + return this.theInputStream; + } + TcpConnection theConnection = this.getTheConnection(); + InputStream inputStream = null; + if (theConnection instanceof TcpNioConnection) { + inputStream = (InputStream) new DirectFieldAccessor(theConnection).getPropertyValue("pipedInputStream"); + } + else if (theConnection instanceof TcpNetConnection) { + Socket socket = (Socket) new DirectFieldAccessor(theConnection).getPropertyValue("socket"); + if (socket != null) { + inputStream = socket.getInputStream(); + } + } + this.theInputStream = inputStream; + return inputStream; + } + + private void doHandshake(WebSocketFrame frame, MessageHeaders messageHeaders) throws Exception { + try { + WebSocketFrame handshake = this.getRequiredDeserializer().generateHandshake(frame); + this.send(MessageBuilder.withPayload(handshake) + .copyHeaders(messageHeaders) + .build()); + } + catch (WebSocketUpgradeException e) { + this.send(MessageBuilder + .withPayload( + new WebSocketFrame(WebSocketFrame.TYPE_DATA, "HTTP/1.1 " + + e.getMessage() + e.getHeaders())) + .copyHeaders(messageHeaders) + .build()); + this.close(); + } + } + + private WebSocketSerializer getRequiredDeserializer() { + Deserializer deserializer = this.getDeserializer(); + Assert.state(deserializer instanceof WebSocketSerializer, + "Deserializer must be a WebSocketSerializer"); + return (WebSocketSerializer) deserializer; + } + } + +} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketUpgradeException.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketUpgradeException.java new file mode 100644 index 0000000..b5b1691 --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketUpgradeException.java @@ -0,0 +1,42 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.websocket; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +@SuppressWarnings("serial") +public class WebSocketUpgradeException extends RuntimeException { + + private final String headers; + + public WebSocketUpgradeException(String message) { + super(message + "\r\n"); + this.headers = "\r\n"; + } + + public WebSocketUpgradeException(String message, String headers) { + super(message + "\r\n"); + this.headers = headers + "\r\n"; + } + + protected String getHeaders() { + return headers; + } + +} diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/Autobahn-context.xml b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/Autobahn-context.xml new file mode 100644 index 0000000..b3b8f06 --- /dev/null +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/Autobahn-context.xml @@ -0,0 +1,101 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/AutobahnTests.java b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/AutobahnTests.java new file mode 100644 index 0000000..51f3aaa --- /dev/null +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/AutobahnTests.java @@ -0,0 +1,34 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.sockjs; + +import org.springframework.context.support.ClassPathXmlApplicationContext; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public class AutobahnTests { + + public static void main(String[] args) throws Exception { + new ClassPathXmlApplicationContext("Autobahn-context.xml", AutobahnTests.class); + System.out.println("Hit Enter To Terminate..."); + System.in.read(); + System.exit(0); + } + +} diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/WebSocketServerTests-context.xml b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/WebSocketServerTests-context.xml new file mode 100644 index 0000000..70155bc --- /dev/null +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/WebSocketServerTests-context.xml @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/WebSocketServerTests.java b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/WebSocketServerTests.java new file mode 100644 index 0000000..ab1a2ef --- /dev/null +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/WebSocketServerTests.java @@ -0,0 +1,97 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.sockjs; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.integration.Message; +import org.springframework.integration.annotation.Header; +import org.springframework.integration.ip.IpHeaders; +import org.springframework.integration.support.MessageBuilder; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public class WebSocketServerTests { + + public static void main(String[] args) throws Exception { + new ClassPathXmlApplicationContext(WebSocketServerTests.class.getSimpleName() + "-context.xml", WebSocketServerTests.class); + System.out.println("Hit Enter To Terminate..."); + System.in.read(); + System.exit(0); + } + + public static class DemoService { + + private static final Log logger = LogFactory.getLog(DemoService.class); + + private final Map clients = new HashMap(); + + private final Map paused = new HashMap(); + + public void startStop(String command, @Header(IpHeaders.CONNECTION_ID) String connectionId) { + if ("stop".equalsIgnoreCase(command)) { + AtomicInteger clientInt = clients.remove(connectionId); + if (clientInt != null) { + paused.put(connectionId, clientInt); + } + logger.info("Connection " + connectionId + " stopped"); + } + else if ("start".equalsIgnoreCase(command)) { + AtomicInteger clientInt = paused.remove(connectionId); + clientInt = clientInt == null ? new AtomicInteger() : clientInt; + clients.put(connectionId, clientInt); + logger.info("Connection " + connectionId + " (re)started"); + } + else { + logger.info("Unexpected command: " + command); + } + } + + public List> getNext() { + List> messages = new ArrayList>(); + for (Entry entry : clients.entrySet()) { + Message message = MessageBuilder.withPayload(Integer.toString(entry.getValue().incrementAndGet())) + .setHeader(IpHeaders.CONNECTION_ID, entry.getKey()) + .build(); + messages.add(message); + logger.warn("Sending " + message.getPayload() + " to connection " + entry.getKey()); + } + if (messages.size() == 0) { + return null; + } + else { + return messages; + } + } + + public void remove(String connetionId) { + logger.warn("Error on write; removing " + connetionId); + clients.remove(connetionId); + } + } + +} diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/ws.html b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/ws.html new file mode 100644 index 0000000..80904e2 --- /dev/null +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/sockjs/ws.html @@ -0,0 +1,83 @@ + + + +Spring Integration Web Socket Test + + + +
+
+

+ +

+ +

+ +
+
+ +

+ +
+ + diff --git a/spring-integration-ip-extensions/src/test/resources/key.store b/spring-integration-ip-extensions/src/test/resources/key.store new file mode 100644 index 0000000..cbabe54 Binary files /dev/null and b/spring-integration-ip-extensions/src/test/resources/key.store differ diff --git a/spring-integration-ip-extensions/src/test/resources/log4j.xml b/spring-integration-ip-extensions/src/test/resources/log4j.xml new file mode 100644 index 0000000..66d909f --- /dev/null +++ b/spring-integration-ip-extensions/src/test/resources/log4j.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/spring-integration-ip-extensions/src/test/resources/trust.store b/spring-integration-ip-extensions/src/test/resources/trust.store new file mode 100644 index 0000000..4e5e139 Binary files /dev/null and b/spring-integration-ip-extensions/src/test/resources/trust.store differ