diff --git a/spring-integration-cassandra/.gitignore b/spring-integration-cassandra/.gitignore index b412f22..865344b 100644 --- a/spring-integration-cassandra/.gitignore +++ b/spring-integration-cassandra/.gitignore @@ -10,3 +10,4 @@ target /*.ipr /*.iws /bin/ +.toDelete diff --git a/spring-integration-cassandra/CODE_OF_CONDUCT.adoc b/spring-integration-cassandra/CODE_OF_CONDUCT.adoc new file mode 100644 index 0000000..f013d6f --- /dev/null +++ b/spring-integration-cassandra/CODE_OF_CONDUCT.adoc @@ -0,0 +1,44 @@ += Contributor Code of Conduct + +As contributors and maintainers of this project, and in the interest of fostering an open +and welcoming community, we pledge to respect all people who contribute through reporting +issues, posting feature requests, updating documentation, submitting pull requests or +patches, and other activities. + +We are committed to making participation in this project a harassment-free experience for +everyone, regardless of level of experience, gender, gender identity and expression, +sexual orientation, disability, personal appearance, body size, race, ethnicity, age, +religion, or nationality. + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery +* Personal attacks +* Trolling or insulting/derogatory comments +* Public or private harassment +* Publishing other's private information, such as physical or electronic addresses, + without explicit permission +* Other unethical or unprofessional conduct + +Project maintainers have the right and responsibility to remove, edit, or reject comments, +commits, code, wiki edits, issues, and other contributions that are not aligned to this +Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors +that they deem inappropriate, threatening, offensive, or harmful. + +By adopting this Code of Conduct, project maintainers commit themselves to fairly and +consistently applying these principles to every aspect of managing this project. Project +maintainers who do not follow or enforce the Code of Conduct may be permanently removed +from the project team. + +This Code of Conduct applies both within project spaces and in public spaces when an +individual is representing the project or its community. + +Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by +contacting a project maintainer at spring-code-of-conduct@pivotal.io . All complaints will +be reviewed and investigated and will result in a response that is deemed necessary and +appropriate to the circumstances. Maintainers are obligated to maintain confidentiality +with regard to the reporter of an incident. + +This Code of Conduct is adapted from the +http://contributor-covenant.org[Contributor Covenant], version 1.3.0, available at +http://contributor-covenant.org/version/1/3/0/[contributor-covenant.org/version/1/3/0/] diff --git a/spring-integration-cassandra/README.md b/spring-integration-cassandra/README.md index 4b5542b..2c82cba 100644 --- a/spring-integration-cassandra/README.md +++ b/spring-integration-cassandra/README.md @@ -1,7 +1,7 @@ Spring Integration Cassandra Adapter ================================================= -Welcome to the *Spring Integration Cassandra adapter*. +Welcome to the *Spring Integration Cassandra* extension. Checking out and building ----------------------------- diff --git a/spring-integration-cassandra/build.gradle b/spring-integration-cassandra/build.gradle index 8f3893f..bf17b1e 100644 --- a/spring-integration-cassandra/build.gradle +++ b/spring-integration-cassandra/build.gradle @@ -1,261 +1,257 @@ -description = 'Spring Integration Cassandra Support' - -apply plugin: 'java' -apply from: "${rootProject.projectDir}/publish-maven.gradle" -apply plugin: 'eclipse' -apply plugin: 'idea' - buildscript { - repositories { - maven { url 'http://repo.spring.io/plugins-release' } - } - dependencies { - classpath 'org.springframework.build.gradle:spring-io-plugin:0.0.3.RELEASE' - } + repositories { + maven { url 'https://repo.spring.io/plugins-release' } + } } + +plugins { + id 'java' + id 'eclipse' + id 'idea' + id 'jacoco' + id 'org.sonarqube' version '2.6.2' + id 'checkstyle' +} + +apply from: "${rootProject.projectDir}/publish-maven.gradle" + +description = 'Spring Integration Cassandra Support' + group = 'org.springframework.integration' repositories { - if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) { - maven { url 'http://repo.spring.io/libs-snapshot' } - } - maven { url 'http://repo.spring.io/libs-milestone' } + if (version.endsWith('BUILD-SNAPSHOT')) { + maven { url 'http://repo.spring.io/libs-snapshot' } + } + maven { url 'http://repo.spring.io/libs-milestone' } // maven { url 'http://repo.spring.io/libs-staging-local' } } -if (project.hasProperty('platformVersion')) { - apply plugin: 'spring-io' +ext { + assertjVersion = '3.11.1' + cassandraUnitVersion = '3.5.0.1' + slf4jVersion = '1.7.25' + reactorVersion = '3.2.4.RELEASE' + springDataCassandraVersion = '2.1.3.RELEASE' + springIntegrationVersion = '5.1.1.RELEASE' + + idPrefix = 'cassandra' + + linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions' + linkCi = 'https://build.spring.io/browse/INTEXT' + linkIssue = 'https://jira.spring.io/browse/INTEXT' + linkScmUrl = 'https://github.com/spring-projects/spring-integration-extensions' + linkScmConnection = 'https://github.com/spring-projects/spring-integration-extensions.git' + linkScmDevConnection = 'git@github.com:spring-projects/spring-integration-extensions.git' - dependencies { - springIoVersions "io.spring.platform:platform-bom:${platformVersion}@properties" - } } -sourceCompatibility = targetCompatibility = 1.7 - -ext { - cassandraVersion = '2.1.5' - cassandraUnitVersion = '2.1.3.1' - jacocoVersion = '0.7.2.201409121644' - slf4jVersion = '1.7.12' - springDataCassandraVersion = '1.3.0.RELEASE' - springIntegrationVersion = '4.2.4.RELEASE' - - idPrefix = 'cassandra' - - linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions' - linkCi = 'https://build.spring.io/browse/INTEXT' - linkIssue = 'https://jira.spring.io/browse/INTEXT' - linkScmUrl = 'https://github.com/spring-projects/spring-integration-extensions' - linkScmConnection = 'https://github.com/spring-projects/spring-integration-extensions.git' - linkScmDevConnection = 'git@github.com:spring-projects/spring-integration-extensions.git' - +compileJava { + sourceCompatibility = 1.8 + targetCompatibility = 1.8 } eclipse.project.natures += 'org.springframework.ide.eclipse.core.springnature' sourceSets { - test { - resources { - srcDirs = ['src/test/resources', 'src/test/java'] - } - } + test { + resources { + srcDirs = ['src/test/resources', 'src/test/java'] + } + } } -// See http://www.gradle.org/docs/current/userguide/dependency_management.html#sub:configurations -// and http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ConfigurationContainer.html -configurations { - jacoco //Configuration Group used by Sonar to provide Code Coverage using JaCoCo +jacoco { + toolVersion = "0.8.2" } +checkstyle { + configFile = file("${rootDir}/src/checkstyle/checkstyle.xml") + toolVersion = "8.16" +} + + dependencies { - compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion" - compile ("org.springframework.data:spring-data-cassandra:$springDataCassandraVersion") { - exclude group: 'org.liquibase', module: 'liquibase-core' - } + compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion" + compile "org.springframework.data:spring-data-cassandra:$springDataCassandraVersion" - testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion" + testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion" - testCompile("org.cassandraunit:cassandra-unit-spring:$cassandraUnitVersion") { - exclude group: 'org.apache.cassandra', module: 'cassandra-all' - exclude group: 'com.datastax.cassandra', module: 'cassandra-driver-core' - exclude group: 'org.slf4j', module: 'slf4j-log4j12' - } - testCompile ("org.apache.cassandra:cassandra-all:$cassandraVersion") { - exclude group: 'ch.qos.logback', module: 'logback-classic' - exclude group: 'ch.qos.logback', module: 'logback-core' - } + testCompile "org.assertj:assertj-core:$assertjVersion" + testCompile "org.cassandraunit:cassandra-unit-spring:$cassandraUnitVersion" + testCompile "io.projectreactor:reactor-test:$reactorVersion" - testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion" - - jacoco "org.jacoco:org.jacoco.agent:$jacocoVersion:runtime" + testRuntime "org.slf4j:jcl-over-slf4j:$slf4jVersion" } // enable all compiler warnings; individual projects may customize further [compileJava, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options'] test { - // suppress all console output during testing unless running `gradle -i` - logging.captureStandardOutput(LogLevel.INFO) - jvmArgs "-javaagent:${configurations.jacoco.asPath}=destfile=${buildDir}/jacoco.exec,includes=*" + // suppress all console output during testing unless running `gradle -i` + logging.captureStandardOutput(LogLevel.INFO) + jacoco { + append = false + destinationFile = file("$buildDir/jacoco.exec") + } } +jacocoTestReport { + reports { + xml.enabled false + csv.enabled false + html.destination file("${buildDir}/reports/jacoco/html") + } +} + +check.dependsOn javadoc +build.dependsOn jacocoTestReport + + task sourcesJar(type: Jar) { - classifier = 'sources' - from sourceSets.main.allJava + classifier = 'sources' + from sourceSets.main.allJava } task javadocJar(type: Jar) { - classifier = 'javadoc' - from javadoc + classifier = 'javadoc' + from javadoc } -artifacts { - archives sourcesJar - archives javadocJar -} - -apply plugin: 'sonar-runner' - -sonarRunner { - sonarProperties { - property "sonar.jacoco.reportPath", "${buildDir.name}/jacoco.exec" - property "sonar.links.homepage", linkHomepage - property "sonar.links.ci", linkCi - property "sonar.links.issue", linkIssue - property "sonar.links.scm", linkScmUrl - property "sonar.links.scm_dev", linkScmDevConnection - property "sonar.java.coveragePlugin", "jacoco" - } +sonarqube { + properties { + property "sonar.jacoco.reportPath", "${buildDir.name}/jacoco.exec" + property "sonar.links.homepage", linkHomepage + property "sonar.links.ci", linkCi + property "sonar.links.issue", linkIssue + property "sonar.links.scm", linkScmUrl + property "sonar.links.scm_dev", linkScmDevConnection + property "sonar.java.coveragePlugin", "jacoco" + } } task api(type: Javadoc) { - group = 'Documentation' - description = 'Generates the Javadoc API documentation.' - title = "${rootProject.description} ${version} API" - options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED - options.author = true - options.header = rootProject.description - options.overview = 'src/api/overview.html' + group = 'Documentation' + description = 'Generates the Javadoc API documentation.' + title = "${rootProject.description} ${version} API" + options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED + options.author = true + options.header = rootProject.description + options.overview = 'src/api/overview.html' - source = sourceSets.main.allJava - classpath = project.sourceSets.main.compileClasspath - destinationDir = new File(buildDir, "api") + source = sourceSets.main.allJava + classpath = project.sourceSets.main.compileClasspath + destinationDir = new File(buildDir, "api") } task schemaZip(type: Zip) { - group = 'Distribution' - classifier = 'schema' - description = "Builds -${classifier} archive containing all " + - "XSDs for deployment at static.springframework.org/schema." + group = 'Distribution' + classifier = 'schema' + description = "Builds -${classifier} archive containing all " + + "XSDs for deployment at static.springframework.org/schema." - def Properties schemas = new Properties(); - def shortName = idPrefix.replaceFirst("${idPrefix}-", '') + def Properties schemas = new Properties(); + def shortName = idPrefix.replaceFirst("${idPrefix}-", '') - project.sourceSets.main.resources.find { - it.path.endsWith("META-INF${File.separator}spring.schemas") - }?.withInputStream { schemas.load(it) } + project.sourceSets.main.resources.find { + it.path.endsWith("META-INF${File.separator}spring.schemas") + }?.withInputStream { schemas.load(it) } - for (def key : schemas.keySet()) { - File xsdFile = project.sourceSets.main.resources.find { - it.path.replaceAll('\\\\', '/').endsWith(schemas.get(key)) - } - assert xsdFile != null - into("integration/${shortName}") { - from xsdFile.path - } - } + for (def key : schemas.keySet()) { + File xsdFile = project.sourceSets.main.resources.find { + it.path.replaceAll('\\\\', '/').endsWith(schemas.get(key)) + } + assert xsdFile != null + into("integration/${shortName}") { + from xsdFile.path + } + } } task docsZip(type: Zip) { - group = 'Distribution' - classifier = 'docs' - description = "Builds -${classifier} archive containing api " + - "for deployment at static.spring.io/spring-integration/docs." + group = 'Distribution' + classifier = 'docs' + description = "Builds -${classifier} archive containing api " + + "for deployment at static.spring.io/spring-integration/docs." - from('src/dist') { - include 'changelog.txt' - } + from('src/dist') { + include 'changelog.txt' + } - from(api) { - into 'api' - } + from(api) { + into 'api' + } } task distZip(type: Zip, dependsOn: [docsZip, schemaZip]) { - group = 'Distribution' - classifier = 'dist' - description = "Builds -${classifier} archive, containing all jars and docs, " + - "suitable for community download page." + group = 'Distribution' + classifier = 'dist' + description = "Builds -${classifier} archive, containing all jars and docs, " + + "suitable for community download page." - ext.baseDir = "${project.name}-${project.version}"; + ext.baseDir = "${project.name}-${project.version}"; - from('src/dist') { - include 'readme.txt' - include 'license.txt' - include 'notice.txt' - into "${baseDir}" - } + from('src/dist') { + include 'readme.txt' + include 'license.txt' + into "${baseDir}" + } - from(zipTree(docsZip.archivePath)) { - into "${baseDir}/docs" - } + from(zipTree(docsZip.archivePath)) { + into "${baseDir}/docs" + } - from(zipTree(schemaZip.archivePath)) { - into "${baseDir}/schema" - } + from(zipTree(schemaZip.archivePath)) { + into "${baseDir}/schema" + } - into("${baseDir}/libs") { - from project.jar - from project.sourcesJar - from project.javadocJar - } + into("${baseDir}/libs") { + from project.jar + from project.sourcesJar + from project.javadocJar + } } // Create an optional "with dependencies" distribution. // Not published by default; only for use when building from source. task depsZip(type: Zip, dependsOn: distZip) { zipTask -> - group = 'Distribution' - classifier = 'dist-with-deps' - description = "Builds -${classifier} archive, containing everything " + - "in the -${distZip.classifier} archive plus all dependencies." + group = 'Distribution' + classifier = 'dist-with-deps' + description = "Builds -${classifier} archive, containing everything " + + "in the -${distZip.classifier} archive plus all dependencies." - from zipTree(distZip.archivePath) + from zipTree(distZip.archivePath) - gradle.taskGraph.whenReady { taskGraph -> - if (taskGraph.hasTask(":${zipTask.name}")) { - def projectName = rootProject.name - def artifacts = new HashSet() + gradle.taskGraph.whenReady { taskGraph -> + if (taskGraph.hasTask(":${zipTask.name}")) { + def projectName = rootProject.name + def artifacts = new HashSet() - rootProject.configurations.runtime.resolvedConfiguration.resolvedArtifacts.each { artifact -> - def dependency = artifact.moduleVersion.id - if (!projectName.equals(dependency.name)) { - artifacts << artifact.file - } - } + rootProject.configurations.runtime.resolvedConfiguration.resolvedArtifacts.each { artifact -> + def dependency = artifact.moduleVersion.id + if (!projectName.equals(dependency.name)) { + artifacts << artifact.file + } + } - zipTask.from(artifacts) { - into "${distZip.baseDir}/deps" - } - } - } + zipTask.from(artifacts) { + into "${distZip.baseDir}/deps" + } + } + } } artifacts { - archives distZip - archives docsZip - archives schemaZip + archives sourcesJar + archives javadocJar + archives distZip + archives docsZip + archives schemaZip } task dist(dependsOn: assemble) { - group = 'Distribution' - description = 'Builds -dist, -docs and -schema distribution archives.' -} - -task wrapper(type: Wrapper) { - description = 'Generates gradlew[.bat] scripts' - gradleVersion = '2.3' - distributionUrl = "http://services.gradle.org/distributions/gradle-${gradleVersion}-all.zip" + group = 'Distribution' + description = 'Builds -dist, -docs and -schema distribution archives.' } diff --git a/spring-integration-cassandra/gradle.properties b/spring-integration-cassandra/gradle.properties index c424047..2771048 100644 --- a/spring-integration-cassandra/gradle.properties +++ b/spring-integration-cassandra/gradle.properties @@ -1 +1 @@ -version=0.5.1.BUILD-SNAPSHOT +version=0.6.0.BUILD-SNAPSHOT diff --git a/spring-integration-cassandra/gradle/wrapper/gradle-wrapper.jar b/spring-integration-cassandra/gradle/wrapper/gradle-wrapper.jar index 085a1cd..87b738c 100644 Binary files a/spring-integration-cassandra/gradle/wrapper/gradle-wrapper.jar and b/spring-integration-cassandra/gradle/wrapper/gradle-wrapper.jar differ diff --git a/spring-integration-cassandra/gradle/wrapper/gradle-wrapper.properties b/spring-integration-cassandra/gradle/wrapper/gradle-wrapper.properties index ecf610c..6b3851a 100644 --- a/spring-integration-cassandra/gradle/wrapper/gradle-wrapper.properties +++ b/spring-integration-cassandra/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,5 @@ -#Thu Apr 09 10:04:06 EEST 2015 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-5.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=http\://services.gradle.org/distributions/gradle-2.3-all.zip diff --git a/spring-integration-cassandra/gradlew b/spring-integration-cassandra/gradlew index 91a7e26..af6708f 100755 --- a/spring-integration-cassandra/gradlew +++ b/spring-integration-cassandra/gradlew @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/usr/bin/env sh ############################################################################## ## @@ -6,47 +6,6 @@ ## ############################################################################## -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS="" - -APP_NAME="Gradle" -APP_BASE_NAME=`basename "$0"` - -# Use the maximum available, or set MAX_FD != -1 to use that value. -MAX_FD="maximum" - -warn ( ) { - echo "$*" -} - -die ( ) { - echo - echo "$*" - echo - exit 1 -} - -# OS specific support (must be 'true' or 'false'). -cygwin=false -msys=false -darwin=false -case "`uname`" in - CYGWIN* ) - cygwin=true - ;; - Darwin* ) - darwin=true - ;; - MINGW* ) - msys=true - ;; -esac - -# For Cygwin, ensure paths are in UNIX format before anything is touched. -if $cygwin ; then - [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` -fi - # Attempt to set APP_HOME # Resolve links: $0 may be a link PRG="$0" @@ -61,9 +20,49 @@ while [ -h "$PRG" ] ; do fi done SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >&- +cd "`dirname \"$PRG\"`/" >/dev/null APP_HOME="`pwd -P`" -cd "$SAVED" >&- +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -90,7 +89,7 @@ location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then MAX_FD_LIMIT=`ulimit -H -n` if [ $? -eq 0 ] ; then if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then @@ -114,6 +113,7 @@ fi if $cygwin ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` @@ -154,11 +154,19 @@ if $cygwin ; then esac fi -# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules -function splitJvmOpts() { - JVM_OPTS=("$@") +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " } -eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS -JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" +APP_ARGS=$(save "$@") -exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/spring-integration-cassandra/gradlew.bat b/spring-integration-cassandra/gradlew.bat index aec9973..0f8d593 100644 --- a/spring-integration-cassandra/gradlew.bat +++ b/spring-integration-cassandra/gradlew.bat @@ -8,14 +8,14 @@ @rem Set local scope for the variables with windows NT shell if "%OS%"=="Windows_NT" setlocal -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - set DIRNAME=%~dp0 if "%DIRNAME%" == "" set DIRNAME=. set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" + @rem Find java.exe if defined JAVA_HOME goto findJavaFromJavaHome @@ -46,10 +46,9 @@ echo location of your Java installation. goto fail :init -@rem Get command-line arguments, handling Windowz variants +@rem Get command-line arguments, handling Windows variants if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args :win9xME_args @rem Slurp the command line arguments. @@ -60,11 +59,6 @@ set _SKIP=2 if "x%~1" == "x" goto execute set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ :execute @rem Setup the command line diff --git a/spring-integration-cassandra/src/api/overview.html b/spring-integration-cassandra/src/api/overview.html index 2d7e93f..dd3e4da 100644 --- a/spring-integration-cassandra/src/api/overview.html +++ b/spring-integration-cassandra/src/api/overview.html @@ -5,7 +5,7 @@ This document is the API specification for Spring Integration Cassandra Extensio

For further API reference and developer documentation, see the - Spring + Spring Integration reference documentation. That documentation contains more detailed, developer-targeted descriptions, with conceptual overviews, definitions of terms, @@ -14,8 +14,8 @@ This document is the API specification for Spring Integration Cassandra Extensio

If you are interested in commercial training, consultancy, and - support for Spring Integration, please visit - http://www.springsource.com + support for Spring Integration, please visit + https://spring.io/

diff --git a/spring-integration-cassandra/src/checkstyle/checkstyle-header.txt b/spring-integration-cassandra/src/checkstyle/checkstyle-header.txt new file mode 100644 index 0000000..f470e97 --- /dev/null +++ b/spring-integration-cassandra/src/checkstyle/checkstyle-header.txt @@ -0,0 +1,17 @@ +^\Q/*\E$ +^\Q * Copyright \E20\d\d(\-20\d\d)?\Q the original author or authors.\E$ +^\Q *\E$ +^\Q * Licensed under the Apache License, Version 2.0 (the "License");\E$ +^\Q * you may not use this file except in compliance with the License.\E$ +^\Q * You may obtain a copy of the License at\E$ +^\Q *\E$ +^\Q * http://www.apache.org/licenses/LICENSE-2.0\E$ +^\Q *\E$ +^\Q * Unless required by applicable law or agreed to in writing, software\E$ +^\Q * distributed under the License is distributed on an "AS IS" BASIS,\E$ +^\Q * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\E$ +^\Q * See the License for the specific language governing permissions and\E$ +^\Q * limitations under the License.\E$ +^\Q */\E$ +^$ +^.*$ diff --git a/spring-integration-cassandra/src/checkstyle/checkstyle-suppressions.xml b/spring-integration-cassandra/src/checkstyle/checkstyle-suppressions.xml new file mode 100644 index 0000000..b638f76 --- /dev/null +++ b/spring-integration-cassandra/src/checkstyle/checkstyle-suppressions.xml @@ -0,0 +1,9 @@ + + + + + + + diff --git a/spring-integration-cassandra/src/checkstyle/checkstyle.xml b/spring-integration-cassandra/src/checkstyle/checkstyle.xml new file mode 100644 index 0000000..d53a329 --- /dev/null +++ b/spring-integration-cassandra/src/checkstyle/checkstyle.xml @@ -0,0 +1,184 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-cassandra/src/dist/notice.txt b/spring-integration-cassandra/src/dist/notice.txt deleted file mode 100644 index f62045a..0000000 --- a/spring-integration-cassandra/src/dist/notice.txt +++ /dev/null @@ -1,21 +0,0 @@ - ======================================================================== - == NOTICE file corresponding to section 4 d of the Apache License, == - == Version 2.0, in this case for the Spring Integration distribution. == - ======================================================================== - - This product includes software developed by - the Apache Software Foundation (http://www.apache.org). - - The end-user documentation included with a redistribution, if any, - must include the following acknowledgement: - - "This product includes software developed by the Spring Framework - Project (http://www.springframework.org)." - - Alternatively, this acknowledgement may appear in the software itself, - if and wherever such third-party acknowledgements normally appear. - - The names "Spring", "Spring Framework", and "Spring Integration" must - not be used to endorse or promote products derived from this software - without prior written permission. For written permission, please contact - enquiries@springsource.com. diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraNamespaceHandler.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraNamespaceHandler.java index 4044f1a..58419c1 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraNamespaceHandler.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraNamespaceHandler.java @@ -1,17 +1,17 @@ /* - * Copyright 2015-2016 the original author or authors + * Copyright 2015-2019 the original author or authors. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.springframework.integration.cassandra.config.xml; @@ -19,6 +19,8 @@ package org.springframework.integration.cassandra.config.xml; import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler; /** + * The namespace handler for "int-cassandra" namespace. + * * @author Artem Bilan * @author Filippo Balicchia */ diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundChannelAdapterParser.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundChannelAdapterParser.java index e4c74f0..3981adc 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundChannelAdapterParser.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundChannelAdapterParser.java @@ -1,11 +1,11 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -25,7 +25,10 @@ import org.springframework.integration.cassandra.outbound.CassandraMessageHandle import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser; /** + * The parser for the {@code }. + * * @author Filippo Balicchia + * @author Artem Bilan */ public class CassandraOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser { diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundGatewayParser.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundGatewayParser.java index 467b8ba..052f312 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundGatewayParser.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraOutboundGatewayParser.java @@ -1,17 +1,17 @@ /* - * Copyright 2016 the original author or authors + * Copyright 2016-2019 the original author or authors. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.springframework.integration.cassandra.config.xml; @@ -25,7 +25,10 @@ import org.springframework.integration.config.xml.AbstractConsumerEndpointParser import org.springframework.integration.config.xml.IntegrationNamespaceUtils; /** + * The parser for the {@code }. + * * @author Filippo Balicchia + * @author Artem Bilan */ public class CassandraOutboundGatewayParser extends AbstractConsumerEndpointParser { diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraParserUtils.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraParserUtils.java index 8caf686..edf9e84 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraParserUtils.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/CassandraParserUtils.java @@ -1,11 +1,11 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -31,12 +31,15 @@ import org.springframework.util.StringUtils; import org.springframework.util.xml.DomUtils; /** + * The {@code int-cassandra} namespace XML parser helper. + * * @author Filippo Balicchia + * @author Artem Bilan */ -public class CassandraParserUtils { +public final class CassandraParserUtils { public static void processOutboundTypeAttributes(Element element, ParserContext parserContext, - BeanDefinitionBuilder builder) { + BeanDefinitionBuilder builder) { String cassandraTemplate = element.getAttribute("cassandra-template"); String mode = element.getAttribute("mode"); @@ -67,18 +70,19 @@ public class CassandraParserUtils { IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "write-options"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ingest-query"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "query"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "async"); List parameterExpressions = DomUtils.getChildElementsByTagName(element, "parameter-expression"); if (!CollectionUtils.isEmpty(parameterExpressions)) { - ManagedMap parameterExpressionsMap = new ManagedMap(); + ManagedMap parameterExpressionsMap = new ManagedMap<>(); for (Element parameterExpressionElement : parameterExpressions) { String name = parameterExpressionElement.getAttribute(AbstractBeanDefinitionParser.NAME_ATTRIBUTE); - BeanDefinition expression = IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined( - IntegrationNamespaceUtils.EXPRESSION_ATTRIBUTE, parameterExpressionElement); + BeanDefinition expression = + IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined( + IntegrationNamespaceUtils.EXPRESSION_ATTRIBUTE, parameterExpressionElement); if (expression != null) { parameterExpressionsMap.put(name, expression); } - } builder.addPropertyValue("parameterExpressions", parameterExpressionsMap); } @@ -86,10 +90,14 @@ public class CassandraParserUtils { } public static boolean areMutuallyExclusive(String query, BeanDefinition statementExpressionDef, - String ingestQuery) { + String ingestQuery) { + return StringUtils.isEmpty(query) && statementExpressionDef == null && StringUtils.isEmpty(ingestQuery) || !(StringUtils.hasText(query) && statementExpressionDef != null && StringUtils.hasText(ingestQuery)) && (StringUtils.hasText(query) ^ statementExpressionDef != null) ^ StringUtils.hasText(ingestQuery); } + private CassandraParserUtils() { + } + } diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java index 33f8701..26312a0 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java @@ -1,3 +1,19 @@ +/* + * Copyright 2015-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + /** * Provides classes for Cassandra parsers and namespace handlers. */ diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java index e1b8e19..622d8ac 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java @@ -1,11 +1,11 @@ /* - * Copyright 2015-2016 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,20 +16,27 @@ package org.springframework.integration.cassandra.outbound; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.springframework.cassandra.core.CachedPreparedStatementCreator; -import org.springframework.cassandra.core.PreparedStatementCreator; -import org.springframework.cassandra.core.WriteOptions; -import org.springframework.data.cassandra.core.CassandraOperations; +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.dao.DataAccessException; +import org.springframework.data.cassandra.ReactiveResultSet; +import org.springframework.data.cassandra.ReactiveSession; +import org.springframework.data.cassandra.core.InsertOptions; +import org.springframework.data.cassandra.core.ReactiveCassandraOperations; +import org.springframework.data.cassandra.core.UpdateOptions; +import org.springframework.data.cassandra.core.WriteResult; +import org.springframework.data.cassandra.core.cql.QueryOptionsUtil; +import org.springframework.data.cassandra.core.cql.ReactiveSessionCallback; +import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.TypeLocator; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.expression.spel.support.StandardTypeLocator; +import org.springframework.integration.expression.ExpressionEvalMap; import org.springframework.integration.expression.ExpressionUtils; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor; @@ -37,24 +44,28 @@ import org.springframework.integration.handler.MessageProcessor; import org.springframework.messaging.Message; import org.springframework.util.Assert; -import com.datastax.driver.core.ColumnDefinitions; -import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.DriverException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** + * An {@link AbstractReplyProducingMessageHandler} implementation for Cassandra outbound operations. + * * @author Soby Chacko * @author Artem Bilan * @author Filippo Balicchia */ -@SuppressWarnings("unchecked") -public class CassandraMessageHandler extends AbstractReplyProducingMessageHandler { +public class CassandraMessageHandler extends AbstractReplyProducingMessageHandler { private final Map parameterExpressions = new HashMap<>(); - private final CassandraOperations cassandraTemplate; - private Type mode; + private final ReactiveCassandraOperations cassandraOperations; + private boolean producesReply; /** @@ -65,21 +76,33 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHan /** * Various options that can be used for Cassandra writes. */ - private WriteOptions writeOptions; + private WriteOptions writeOptions = WriteOptions.empty(); - private MessageProcessor statementProcessor; + private ReactiveSessionMessageCallback sessionMessageCallback; private EvaluationContext evaluationContext; - public CassandraMessageHandler(CassandraOperations cassandraTemplate) { - this(cassandraTemplate, Type.INSERT); + public CassandraMessageHandler(ReactiveCassandraOperations cassandraOperations) { + this(cassandraOperations, Type.INSERT); } - public CassandraMessageHandler(CassandraOperations cassandraTemplate, CassandraMessageHandler.Type queryType) { - Assert.notNull(cassandraTemplate, "'cassandraTemplate' must not be null."); + public CassandraMessageHandler(ReactiveCassandraOperations cassandraOperations, + CassandraMessageHandler.Type queryType) { + + Assert.notNull(cassandraOperations, "'cassandraOperations' must not be null."); Assert.notNull(queryType, "'queryType' must not be null."); - this.cassandraTemplate = cassandraTemplate; + this.cassandraOperations = cassandraOperations; this.mode = queryType; + setAsync(true); + switch (this.mode) { + + case INSERT: + this.writeOptions = InsertOptions.empty(); + break; + case UPDATE: + this.writeOptions = UpdateOptions.empty(); + break; + } } public void setIngestQuery(String ingestQuery) { @@ -89,6 +112,7 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHan } public void setWriteOptions(WriteOptions writeOptions) { + Assert.notNull(writeOptions, "'writeOptions' must not be null"); this.writeOptions = writeOptions; } @@ -96,47 +120,32 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHan this.producesReply = producesReply; } + public void setStatementExpressionString(String statementExpression) { + setStatementExpression(EXPRESSION_PARSER.parseExpression(statementExpression)); + } + public void setStatementExpression(Expression statementExpression) { - setStatementProcessor(new ExpressionEvaluatingMessageProcessor(statementExpression, - Statement.class) { + setStatementProcessor( + new ExpressionEvaluatingMessageProcessor(statementExpression, Statement.class) { - @Override - protected StandardEvaluationContext getEvaluationContext() { - return (StandardEvaluationContext) CassandraMessageHandler.this.evaluationContext; - } + @Override + protected StandardEvaluationContext getEvaluationContext() { + return (StandardEvaluationContext) CassandraMessageHandler.this.evaluationContext; + } - }); + }); } public void setQuery(String query) { Assert.hasText(query, "'query' must not be empty"); - - final PreparedStatementCreator statementCreator = new CachedPreparedStatementCreator(query); - - setStatementProcessor(new MessageProcessor() { - - @Override - public Statement processMessage(Message message) { - PreparedStatement preparedStatement = - statementCreator.createPreparedStatement(cassandraTemplate.getSession()); - ColumnDefinitions variables = preparedStatement.getVariables(); - List values = new ArrayList<>(variables.size()); - Map valueMap = new HashMap<>(variables.size()); - for (ColumnDefinitions.Definition definition : variables) { - String name = definition.getName(); - Object value = valueMap.get(name); - if (value == null) { - Expression expression = parameterExpressions.get(name); - Assert.state(expression != null, "No expression for parameter: " + name); - value = expression.getValue(evaluationContext, message); - valueMap.put(name, value); - } - values.add(value); - } - return preparedStatement.bind(values.toArray()); - } - - }); + this.sessionMessageCallback = + (session, requestMessage) -> + session.execute(query, + ExpressionEvalMap.from(this.parameterExpressions) + .usingEvaluationContext(this.evaluationContext) + .withRoot(requestMessage) + .build()); + this.mode = Type.STATEMENT; } public void setParameterExpressions(Map parameterExpressions) { @@ -147,7 +156,11 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHan public void setStatementProcessor(MessageProcessor statementProcessor) { Assert.notNull(statementProcessor, "'statementProcessor' must not be null."); - this.statementProcessor = statementProcessor; + this.sessionMessageCallback = + (session, requestMessage) -> + session.execute( + QueryOptionsUtil.addQueryOptions(statementProcessor.processMessage(requestMessage), + this.writeOptions)); this.mode = Type.STATEMENT; } @@ -174,83 +187,156 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHan protected Object handleRequestMessage(Message requestMessage) { Object payload = requestMessage.getPayload(); - Object result = payload; + Mono result = null; Type mode = this.mode; - Statement statement = null; - if (payload instanceof Statement) { - statement = (Statement) payload; mode = Type.STATEMENT; } switch (mode) { case INSERT: - if (this.ingestQuery != null) { - Assert.isInstanceOf(List.class, payload, - "to perform 'ingest' the 'payload' must be of 'List>' type."); - List list = (List) payload; - for (Object o : list) { - Assert.isInstanceOf(List.class, o, - "to perform 'ingest' the 'payload' must be of 'List>' type."); - } - List> rows = (List>) payload; - this.cassandraTemplate.ingest(this.ingestQuery, rows, this.writeOptions); - } - else { - if (payload instanceof List) { - this.cassandraTemplate.insert((List) payload, this.writeOptions); - } - else { - this.cassandraTemplate.insert(payload, this.writeOptions); - } - } + result = handleInsert(payload); break; case UPDATE: - if (payload instanceof List) { - this.cassandraTemplate.update((List) payload, this.writeOptions); - } - else { - this.cassandraTemplate.update(payload, this.writeOptions); - } + result = handleUpdate(payload); break; case DELETE: - if (payload instanceof List) { - this.cassandraTemplate.delete((List) payload, this.writeOptions); - } - else { - this.cassandraTemplate.delete(payload, this.writeOptions); - } + result = handleDelete(payload); break; case STATEMENT: - if (statement == null) { - statement = this.statementProcessor.processMessage(requestMessage); - } - - result = this.cassandraTemplate.executeAsynchronously(statement).getUninterruptibly(); + result = handleStatement(requestMessage); break; } - return this.producesReply ? result : null; + if (this.producesReply) { + return isAsync() ? result : result.block(); + } + else { + if (isAsync()) { + result.subscribe(); + } + else { + result.block(); + } + return null; + } + } + + @SuppressWarnings("unchecked") + private Mono handleInsert(Object payload) { + if (this.ingestQuery != null) { + Assert.isInstanceOf(List.class, payload, + "to perform 'ingest' the 'payload' must be of 'List>' type."); + List list = (List) payload; + for (Object o : list) { + Assert.isInstanceOf(List.class, o, + "to perform 'ingest' the 'payload' must be of 'List>' type."); + } + List> rows = (List>) payload; + return this.cassandraOperations.getReactiveCqlOperations() + .execute((ReactiveSessionCallback) session -> + session.prepare(this.ingestQuery) + .map(s -> QueryOptionsUtil.addPreparedStatementOptions(s, this.writeOptions)) + .flatMapMany(s -> + Flux.fromIterable(rows) + .map(row -> s.bind(row.toArray()))) + .collect(BatchStatement::new, BatchStatement::add) + .flatMap(session::execute) + .transform(this::transformToWriteResult)) + .next(); + } + else { + if (payload instanceof List) { + return this.cassandraOperations.batchOps() + .insert((List) payload, this.writeOptions) + .execute(); + } + else { + return this.cassandraOperations.insert(payload, (InsertOptions) this.writeOptions); + } + } + } + + private Mono handleUpdate(Object payload) { + if (payload instanceof List) { + return this.cassandraOperations.batchOps() + .update((List) payload, this.writeOptions) + .execute(); + } + else { + return this.cassandraOperations.update(payload, (UpdateOptions) this.writeOptions); + } + } + + private Mono handleDelete(Object payload) { + if (payload instanceof List) { + return this.cassandraOperations.batchOps() + .delete((List) payload) + .execute(); + } + else { + return this.cassandraOperations.delete(payload, this.writeOptions); + } + } + + private Mono handleStatement(Message requestMessage) { + Object payload = requestMessage.getPayload(); + Mono resultSetMono; + if (payload instanceof Statement) { + resultSetMono = this.cassandraOperations.getReactiveCqlOperations().queryForResultSet((Statement) payload); + } + else { + resultSetMono = this.cassandraOperations.getReactiveCqlOperations() + .execute((ReactiveSessionCallback) session -> + this.sessionMessageCallback.doInSession(session, requestMessage)) + .next(); + } + + return resultSetMono.transform(this::transformToWriteResult); + } + + private Mono transformToWriteResult(Mono resultSetMono) { + return resultSetMono + .map(DirectFieldAccessor::new) + .map(accessor -> accessor.getPropertyValue("resultSet")) + .cast(ResultSet.class) + .map(WriteResult::of); } /** - * Always return {@code false} to prevent a {@link com.datastax.driver.core.ResultSet} - * draining on iteration. - * - * @param reply ignored. - * @return {@code false}. + * The mode for the {@link CassandraMessageHandler}. */ - @Override - protected boolean shouldSplitOutput(Iterable reply) { - return false; - } - - public enum Type { - INSERT, UPDATE, DELETE, STATEMENT; + /** + * Set a {@link CassandraMessageHandler} into an {@code insert} mode. + */ + INSERT, + + /** + * Set a {@link CassandraMessageHandler} into an {@code update} mode. + */ + UPDATE, + + /** + * Set a {@link CassandraMessageHandler} into a {@code delete} mode. + */ + DELETE, + + /** + * Set a {@link CassandraMessageHandler} into a {@code statement} mode. + */ + STATEMENT; + + } + + @FunctionalInterface + private interface ReactiveSessionMessageCallback { + + Mono doInSession(ReactiveSession session, Message requestMessage) + throws DriverException, DataAccessException; } diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java index cd9909b..e6018b2 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java @@ -1,4 +1,20 @@ +/* + * Copyright 2015-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + /** - * Provides classes supporting Cassndra outbound endpoints. + * Provides classes supporting Cassandra outbound endpoints. */ package org.springframework.integration.cassandra.outbound; diff --git a/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra-1.0.xsd b/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra-1.0.xsd index 5d686c2..1df201b 100644 --- a/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra-1.0.xsd +++ b/spring-integration-cassandra/src/main/resources/org/springframework/integration/config/xml/spring-integration-cassandra-1.0.xsd @@ -1,15 +1,15 @@ - - + xmlns="http://www.springframework.org/schema/integration/cassandra" + xmlns:xsd="http://www.w3.org/2001/XMLSchema" + xmlns:tool="http://www.springframework.org/schema/tool" + xmlns:integration="http://www.springframework.org/schema/integration" + targetNamespace="http://www.springframework.org/schema/integration/cassandra" + elementFormDefault="qualified"> + + + schemaLocation="http://www.springframework.org/schema/integration/spring-integration.xsd"/> - - - - Specify an expression for parameter variable placeholder in cql statement. - - - + + + + Specify an expression for parameter variable placeholder in cql statement. + + + - + - + @@ -55,23 +56,25 @@ - - - - Specify an expression for parameter variable placeholder in cql statement. - - - + + + + Specify an expression for parameter variable placeholder in cql statement. + + + - Message Channel to which replies should be sent after being received from Cassandra cluster. + Message Channel to which replies should be sent after being received from Cassandra + cluster. + type="org.springframework.messaging.MessageChannel"/> @@ -79,7 +82,7 @@ - Unique ID for this gateway. + Unique ID for this gateway. @@ -91,7 +94,7 @@ + type="org.springframework.messaging.MessageChannel"/> @@ -101,7 +104,7 @@ - + @@ -109,44 +112,43 @@ - - + + - - - + + Reference to an instance of + 'org.springframework.data.cassandra.core.ReactiveCassandraOperations'. + + type="org.springframework.data.cassandra.core.ReactiveCassandraOperations"/> - - - + + Reference to an instance of + 'org.springframework.data.cassandra.core.cql.WriteOptions' + + type="org.springframework.data.cassandra.core.cql.WriteOptions"/> - + - + @@ -174,8 +176,22 @@ + + + + ' async, reactive manner on the downstream + 'FluxMessageChannel' subscription or via 'Mono.subscribe()' in the handler, if one-way. + Otherwise the 'Mono.block()' is called immediately before returning from the handler. + ]]> + + + + + + - + @@ -200,13 +216,13 @@ - + - - - - - - + + + + + + diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests-context.xml b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests-context.xml index 536f115..6db938e 100644 --- a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests-context.xml +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests-context.xml @@ -1,87 +1,75 @@ + http://www.springframework.org/schema/integration/cassandra http://www.springframework.org/schema/integration/cassandra/spring-integration-cassandra.xsd"> - - - + + + - + class="org.springframework.integration.cassandra.test.domain.Book"> + - + - + + - - - - - - - - - - - - - - + + + + + + + - - - - - - + - - - + + + + + + + + + - - - + request-channel="inputChannel" + cassandra-template="reactiveCassandraTemplate" + mode="STATEMENT" + query="SELECT * FROM book limit :size" + reply-channel="resultChannel"> + + + - diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests.java index 7196bab..3c3dd0f 100644 --- a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests.java +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterIntegrationTests.java @@ -1,11 +1,11 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,10 +16,7 @@ package org.springframework.integration.cassandra.config; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.util.ArrayList; @@ -35,27 +32,27 @@ import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.cassandra.core.CassandraTemplate; +import org.springframework.data.cassandra.core.WriteResult; import org.springframework.integration.cassandra.test.domain.Book; import org.springframework.integration.cassandra.test.domain.BookSampler; import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; -import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit4.SpringRunner; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; /** * @author Filippo Balicchia + * @author Artem Bilan */ -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration +@RunWith(SpringRunner.class) @DirtiesContext public class CassandraOutboundAdapterIntegrationTests { @@ -80,18 +77,12 @@ public class CassandraOutboundAdapterIntegrationTests { private DirectChannel inputChannel; @Autowired - private PollableChannel resultChannel; + private FluxMessageChannel resultChannel; @BeforeClass - public static void init() throws TTransportException, IOException, InterruptedException, ConfigurationException { - startCassandra(); - - } - - private static void startCassandra() - throws TTransportException, IOException, InterruptedException, ConfigurationException { - + public static void init() throws TTransportException, IOException, ConfigurationException { EmbeddedCassandraServerHelper.startEmbeddedCassandra(CASSANDRA_CONFIG, "build/embeddedCassandra"); + EmbeddedCassandraServerHelper.getSession(); } @AfterClass @@ -100,35 +91,40 @@ public class CassandraOutboundAdapterIntegrationTests { } @Test - public void testBasicCassandraInsert() throws Exception { + public void testBasicCassandraInsert() { Book b1 = BookSampler.getBook(); Message message = MessageBuilder.withPayload(b1).build(); - cassandraMessageHandler1.send(message); + this.cassandraMessageHandler1.send(message); Select select = QueryBuilder.select().all().from("book"); - List books = cassandraTemplate.select(select, Book.class); - assertEquals(1, books.size()); - cassandraTemplate.delete(b1); + List books = this.cassandraTemplate.select(select, Book.class); + assertThat(books).hasSize(1); + this.cassandraTemplate.delete(b1); } @Test - public void testCassandraBatchInsertAndSelectStatement() throws Exception { + public void testCassandraBatchInsertAndSelectStatement() { List books = BookSampler.getBookList(5); - cassandraMessageHandler2.send(new GenericMessage<>(books)); + this.cassandraMessageHandler2.send(new GenericMessage<>(books)); Message message = MessageBuilder.withPayload("Cassandra Puppy Guru").setHeader("limit", 2).build(); - inputChannel.send(message); - Message receive = resultChannel.receive(10000); - assertNotNull(receive); - assertThat(receive.getPayload(), instanceOf(ResultSet.class)); - ResultSet resultSet = (ResultSet) receive.getPayload(); - assertNotNull(resultSet); - List rows = resultSet.all(); - assertEquals(2, rows.size()); - cassandraMessageHandler1.send(new GenericMessage<>(QueryBuilder.truncate("book"))); + this.inputChannel.send(message); + + Mono testMono = + Mono.from(this.resultChannel) + .map(Message::getPayload) + .cast(WriteResult.class) + .map(r -> r.getRows().size()); + + StepVerifier.create(testMono) + .expectNext(2) + .expectComplete() + .verify(); + + this.cassandraMessageHandler1.send(new GenericMessage<>(QueryBuilder.truncate("book"))); } @Test - public void testCassandraBatchIngest() throws Exception { + public void testCassandraBatchIngest() { List books = BookSampler.getBookList(5); List> ingestBooks = new ArrayList<>(); for (Book b : books) { @@ -144,23 +140,23 @@ public class CassandraOutboundAdapterIntegrationTests { } Message>> message = MessageBuilder.withPayload(ingestBooks).build(); - cassandraMessageHandler3.send(message); + this.cassandraMessageHandler3.send(message); Select select = QueryBuilder.select().all().from("book"); - books = cassandraTemplate.select(select, Book.class); - assertEquals(5, books.size()); - cassandraTemplate.delete(books); + books = this.cassandraTemplate.select(select, Book.class); + assertThat(books).hasSize(5); + this.cassandraTemplate.batchOps().delete(books); } @Test - public void testExpressionTrucante() throws Exception { + public void testExpressionTruncate() { Message message = MessageBuilder.withPayload(BookSampler.getBook()).build(); - cassandraMessageHandler1.send(message); + this.cassandraMessageHandler1.send(message); Select select = QueryBuilder.select().all().from("book"); - List books = cassandraTemplate.select(select, Book.class); - assertEquals(1, books.size()); - cassandraMessageHandler4.send(MessageBuilder.withPayload("Empty").build()); - books = cassandraTemplate.select(select, Book.class); - assertEquals(0, books.size()); + List books = this.cassandraTemplate.select(select, Book.class); + assertThat(books).hasSize(1); + this.cassandraMessageHandler4.send(MessageBuilder.withPayload("Empty").build()); + books = this.cassandraTemplate.select(select, Book.class); + assertThat(books).hasSize(0); } } diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests-context.xml b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests-context.xml index 8627f14..d598e4a 100644 --- a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests-context.xml +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests-context.xml @@ -1,18 +1,15 @@ + http://www.springframework.org/schema/integration/cassandra http://www.springframework.org/schema/integration/cassandra/spring-integration-cassandra.xsd"> - - + + @@ -20,45 +17,43 @@ - + - - - - - + - - - - - - - - - - - - + + + + + + + + + + + + + + + + diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests.java index bfc74ad..f0e5e27 100644 --- a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests.java +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraOutboundAdapterParserTests.java @@ -1,11 +1,11 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +16,7 @@ package org.springframework.integration.cassandra.config; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; import org.junit.Test; import org.junit.runner.RunWith; @@ -25,15 +25,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.integration.cassandra.outbound.CassandraMessageHandler; import org.springframework.integration.test.util.TestUtils; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit4.SpringRunner; /** * @author Filippo Balicchia + * @author Artem Bilan */ -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration +@RunWith(SpringRunner.class) public class CassandraOutboundAdapterParserTests { @Autowired @@ -41,46 +40,52 @@ public class CassandraOutboundAdapterParserTests { @Test public void minimalConfig() { + CassandraMessageHandler handler = + TestUtils.getPropertyValue(this.context.getBean("outbound1.adapter"), "handler", + CassandraMessageHandler.class); - CassandraMessageHandler handler = TestUtils.getPropertyValue(context.getBean("outbound1.adapter"), "handler", - CassandraMessageHandler.class); - - assertEquals("outbound1.adapter", TestUtils.getPropertyValue(handler, "componentName")); - assertEquals(CassandraMessageHandler.Type.INSERT, TestUtils.getPropertyValue(handler, "mode")); - assertEquals(context.getBean("cassandraTemplate"), TestUtils.getPropertyValue(handler, "cassandraTemplate")); - assertEquals(context.getBean("writeOptions"), TestUtils.getPropertyValue(handler, "writeOptions")); + assertThat(TestUtils.getPropertyValue(handler, "componentName")).isEqualTo("outbound1.adapter"); + assertThat(TestUtils.getPropertyValue(handler, "mode")).isEqualTo(CassandraMessageHandler.Type.INSERT); + assertThat(TestUtils.getPropertyValue(handler, "cassandraOperations")) + .isSameAs(this.context.getBean("cassandraTemplate")); + assertThat(TestUtils.getPropertyValue(handler, "writeOptions")).isSameAs(this.context.getBean("writeOptions")); + assertThat(TestUtils.getPropertyValue(handler, "async", Boolean.class)).isFalse(); } @Test public void ingestConfig() { - CassandraMessageHandler handler = TestUtils.getPropertyValue(context.getBean("outbound2"), "handler", - CassandraMessageHandler.class); + CassandraMessageHandler handler = + TestUtils.getPropertyValue(this.context.getBean("outbound2"), "handler", + CassandraMessageHandler.class); - assertEquals("insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)", - TestUtils.getPropertyValue(handler, "ingestQuery")); - assertEquals(Boolean.FALSE, TestUtils.getPropertyValue(handler, "producesReply")); + assertThat(TestUtils.getPropertyValue(handler, "ingestQuery")) + .isEqualTo("insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, " + + "?)"); + assertThat(TestUtils.getPropertyValue(handler, "producesReply", Boolean.class)).isFalse(); } @Test public void fullConfig() { - CassandraMessageHandler handler = TestUtils.getPropertyValue(context.getBean("outgateway"), "handler", - CassandraMessageHandler.class); + CassandraMessageHandler handler = + TestUtils.getPropertyValue(this.context.getBean("outgateway"), "handler", + CassandraMessageHandler.class); - assertEquals(Boolean.TRUE, TestUtils.getPropertyValue(handler, "producesReply")); - assertEquals(CassandraMessageHandler.Type.STATEMENT, TestUtils.getPropertyValue(handler, "mode")); - assertEquals(context.getBean("writeOptions"), TestUtils.getPropertyValue(handler, "writeOptions")); + assertThat(TestUtils.getPropertyValue(handler, "producesReply", Boolean.class)).isTrue(); + assertThat(TestUtils.getPropertyValue(handler, "mode")).isEqualTo(CassandraMessageHandler.Type.STATEMENT); + assertThat(TestUtils.getPropertyValue(handler, "writeOptions")).isSameAs(this.context.getBean("writeOptions")); } @Test public void statementConfig() { + CassandraMessageHandler handler = + TestUtils.getPropertyValue(this.context.getBean("outbound4.adapter"), "handler", + CassandraMessageHandler.class); - CassandraMessageHandler handler = TestUtils.getPropertyValue(context.getBean("outbound4.adapter"), "handler", - CassandraMessageHandler.class); - assertEquals("outbound4.adapter", TestUtils.getPropertyValue(handler, "componentName")); - assertEquals(CassandraMessageHandler.Type.STATEMENT, TestUtils.getPropertyValue(handler, "mode")); - assertEquals(context.getBean("cassandraTemplate"), TestUtils.getPropertyValue(handler, "cassandraTemplate")); - assertEquals(context.getBean("writeOptions"), TestUtils.getPropertyValue(handler, "writeOptions")); - + assertThat(TestUtils.getPropertyValue(handler, "componentName")).isEqualTo("outbound4.adapter"); + assertThat(TestUtils.getPropertyValue(handler, "mode")).isEqualTo(CassandraMessageHandler.Type.STATEMENT); + assertThat(TestUtils.getPropertyValue(handler, "cassandraOperations")) + .isSameAs(this.context.getBean("cassandraTemplate")); + assertThat(TestUtils.getPropertyValue(handler, "writeOptions")).isSameAs(this.context.getBean("writeOptions")); } } diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraParserUtilsTests.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraParserUtilsTests.java index 20bf91c..de04827 100644 --- a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraParserUtilsTests.java +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/CassandraParserUtilsTests.java @@ -1,11 +1,11 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +16,8 @@ package org.springframework.integration.cassandra.config; -import org.junit.Assert; +import static org.assertj.core.api.Assertions.assertThat; + import org.junit.Test; import org.springframework.beans.factory.config.BeanDefinition; @@ -25,8 +26,8 @@ import org.springframework.integration.cassandra.config.xml.CassandraParserUtils /** * @author Filippo Balicchia + * @author Artem Bilan */ - public class CassandraParserUtilsTests { @Test @@ -34,15 +35,16 @@ public class CassandraParserUtilsTests { String query = ""; BeanDefinition statementExpressionDef = null; String ingestQuery = ""; - Assert.assertTrue(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)); + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isTrue(); } @Test public void mutuallyExclusiveCase2() { String query = ""; BeanDefinition statementExpressionDef = null; - String ingestQuery = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; - Assert.assertTrue(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)); + String ingestQuery = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isTrue(); } @Test @@ -50,15 +52,16 @@ public class CassandraParserUtilsTests { String query = ""; BeanDefinition statementExpressionDef = new RootBeanDefinition(); String ingestQuery = ""; - Assert.assertTrue(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)); + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isTrue(); } @Test public void mutuallyExclusiveCase4() { String query = ""; BeanDefinition statementExpressionDef = new RootBeanDefinition(); - String ingestQuery = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; - Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)); + String ingestQuery = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); } @Test @@ -66,15 +69,16 @@ public class CassandraParserUtilsTests { String query = "SELECT * FROM book limit :size"; BeanDefinition statementExpressionDef = new RootBeanDefinition(); String ingestQuery = ""; - Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)); + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); } @Test public void mutuallyExclusiveCase6() { String query = "SELECT * FROM book limit :size"; BeanDefinition statementExpressionDef = new RootBeanDefinition(); - String ingestQuery = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; - Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)); + String ingestQuery = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); } @Test @@ -82,15 +86,16 @@ public class CassandraParserUtilsTests { String query = "SELECT * FROM book limit :size"; BeanDefinition statementExpressionDef = new RootBeanDefinition(); String ingestQuery = ""; - Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)); + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); } @Test public void mutuallyExclusiveCase8() { String query = "SELECT * FROM book limit :size"; BeanDefinition statementExpressionDef = new RootBeanDefinition(); - String ingestQuery = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; - Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)); + String ingestQuery = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse(); } } diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/IntegrationTestConfig.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/IntegrationTestConfig.java index b5852c6..3a4f718 100644 --- a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/IntegrationTestConfig.java +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/config/IntegrationTestConfig.java @@ -1,11 +1,11 @@ /* - * Copyright 2015 the original author or authors + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,30 +16,30 @@ package org.springframework.integration.cassandra.config; -import static org.springframework.cassandra.core.keyspace.CreateKeyspaceSpecification.createKeyspace; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.UUID; -import org.springframework.cassandra.core.keyspace.CreateKeyspaceSpecification; import org.springframework.context.annotation.Configuration; +import org.springframework.data.cassandra.config.AbstractReactiveCassandraConfiguration; import org.springframework.data.cassandra.config.SchemaAction; -import org.springframework.data.cassandra.config.java.AbstractCassandraConfiguration; +import org.springframework.data.cassandra.core.cql.keyspace.CreateKeyspaceSpecification; /** * Setup any spring configuration for unit tests * * @author David Webb * @author Matthew T. Adams + * @author Artem Bilan */ @Configuration -public class IntegrationTestConfig extends AbstractCassandraConfiguration { +public class IntegrationTestConfig extends AbstractReactiveCassandraConfiguration { public static final String HOST = "localhost"; - //public static final SpringCassandraBuildProperties PROPS = new SpringCassandraBuildProperties(); - public static final int PORT = 9043;//PROPS.getCassandraPort(); + // public static final SpringCassandraBuildProperties PROPS = new SpringCassandraBuildProperties(); + public static final int PORT = 9043; //PROPS.getCassandraPort(); // public static final int RPC_PORT = PROPS.getCassandraRpcPort(); @@ -61,12 +61,14 @@ public class IntegrationTestConfig extends AbstractCassandraConfiguration { @Override protected String getKeyspaceName() { - return keyspaceName; + return this.keyspaceName; } @Override protected List getKeyspaceCreations() { - return Arrays.asList(createKeyspace().name(getKeyspaceName()).withSimpleReplication()); + return Collections.singletonList( + CreateKeyspaceSpecification.createKeyspace(getKeyspaceName()) + .withSimpleReplication()); } } diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandlerTests.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandlerTests.java index 0bd6539..926ce19 100644 --- a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandlerTests.java +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandlerTests.java @@ -1,11 +1,11 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,39 +16,35 @@ package org.springframework.integration.cassandra.outbound; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; -import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.thrift.transport.TTransportException; import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; + import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cassandra.core.ConsistencyLevel; -import org.springframework.cassandra.core.RetryPolicy; -import org.springframework.cassandra.core.WriteOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.cassandra.core.CassandraOperations; +import org.springframework.data.cassandra.core.InsertOptions; +import org.springframework.data.cassandra.core.ReactiveCassandraOperations; +import org.springframework.data.cassandra.core.WriteResult; +import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.cassandra.config.IntegrationTestConfig; import org.springframework.integration.cassandra.test.domain.Book; import org.springframework.integration.cassandra.test.domain.BookSampler; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.channel.NullChannel; -import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; @@ -56,100 +52,25 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit4.SpringRunner; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; /** * @author Soby Chacko * @author Artem Bilan */ -@ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) +@RunWith(SpringRunner.class) @DirtiesContext public class CassandraMessageHandlerTests { private static final SpelExpressionParser PARSER = new SpelExpressionParser(); - @Configuration - @EnableIntegration - public static class Config extends IntegrationTestConfig { - - @Autowired - public CassandraOperations template; - - @Override - public String[] getEntityBasePackages() { - return new String[] { Book.class.getPackage().getName() }; - } - - @Bean - public MessageHandler cassandraMessageHandler1() { - CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler<>(this.template); - cassandraMessageHandler.setProducesReply(false); - return cassandraMessageHandler; - } - - @Bean - public PollableChannel messageChannel() { - return new NullChannel(); - } - - @Bean - public MessageHandler cassandraMessageHandler2() { - CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler<>(this.template); - - WriteOptions options = new WriteOptions(); - options.setTtl(60); - options.setConsistencyLevel(ConsistencyLevel.ONE); - options.setRetryPolicy(RetryPolicy.DOWNGRADING_CONSISTENCY); - - cassandraMessageHandler.setWriteOptions(options); - - cassandraMessageHandler.setOutputChannel(messageChannel()); - - return cassandraMessageHandler; - } - - @Bean - public MessageHandler cassandraMessageHandler3() { - CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler<>(this.template); - String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; - cassandraMessageHandler.setIngestQuery(cqlIngest); - return cassandraMessageHandler; - } - - @Bean - public PollableChannel resultChannel() { - return new QueueChannel(); - } - - @Bean - public MessageHandler cassandraMessageHandler4() { - CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler<>(this.template); - // TODO https://jira.spring.io/browse/DATACASS-213 - // cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author - // = :author limit :size"); - cassandraMessageHandler.setQuery("SELECT * FROM book limit :size"); - - Map params = new HashMap<>(); - params.put("author", PARSER.parseExpression("payload")); - params.put("size", PARSER.parseExpression("headers.limit")); - - cassandraMessageHandler.setParameterExpressions(params); - - cassandraMessageHandler.setOutputChannel(resultChannel()); - cassandraMessageHandler.setProducesReply(true); - return cassandraMessageHandler; - } - - } + private static final String CASSANDRA_CONFIG = "spring-cassandra.yaml"; @Autowired public MessageHandler cassandraMessageHandler1; @@ -167,37 +88,21 @@ public class CassandraMessageHandlerTests { public CassandraOperations template; @Autowired - public PollableChannel resultChannel; - - protected static final String CASSANDRA_CONFIG = "spring-cassandra.yaml"; - - /** - * The {@link Cluster} that's connected to Cassandra. - */ - protected static Cluster cluster; - - /** - * The session connected to the system keyspace. - */ - protected static Session system; + public FluxMessageChannel resultChannel; @BeforeClass - public static void startCassandra() - throws TTransportException, IOException, InterruptedException, ConfigurationException { + public static void startCassandra() throws Exception { EmbeddedCassandraServerHelper.startEmbeddedCassandra(CASSANDRA_CONFIG, "build/embeddedCassandra"); - cluster = Cluster.builder().addContactPoint(IntegrationTestConfig.HOST).withPort(IntegrationTestConfig.PORT) - .build(); - system = cluster.connect(); + EmbeddedCassandraServerHelper.getSession(); } @AfterClass public static void cleanup() { - cluster.close(); EmbeddedCassandraServerHelper.cleanEmbeddedCassandra(); } @Test - public void testBasicCassandraInsert() throws Exception { + public void testBasicCassandraInsert() { Book b1 = new Book(); b1.setIsbn("123456-1"); b1.setTitle("Spring Integration Cassandra"); @@ -211,13 +116,13 @@ public class CassandraMessageHandlerTests { Select select = QueryBuilder.select().all().from("book"); List books = this.template.select(select, Book.class); - assertEquals(1, books.size()); + assertThat(books).hasSize(1); this.template.delete(b1); } @Test - public void testCassandraBatchInsertAndSelectStatement() throws Exception { + public void testCassandraBatchInsertAndSelectStatement() { List books = BookSampler.getBookList(5); this.cassandraMessageHandler2.handleMessage(new GenericMessage<>(books)); @@ -225,23 +130,25 @@ public class CassandraMessageHandlerTests { Message message = MessageBuilder.withPayload("Cassandra Guru").setHeader("limit", 2).build(); this.cassandraMessageHandler4.handleMessage(message); - Message receive = this.resultChannel.receive(10000); - assertNotNull(receive); - assertThat(receive.getPayload(), instanceOf(ResultSet.class)); - ResultSet resultSet = (ResultSet) receive.getPayload(); - assertNotNull(resultSet); - List rows = resultSet.all(); - assertEquals(2, rows.size()); + Mono testMono = + Mono.from(this.resultChannel) + .map(Message::getPayload) + .cast(WriteResult.class) + .map(r -> r.getRows().size()); + + StepVerifier.create(testMono) + .expectNext(1) + .expectComplete() + .verify(); this.cassandraMessageHandler1.handleMessage(new GenericMessage<>(QueryBuilder.truncate("book"))); } @Test - public void testCassandraBatchIngest() throws Exception { + public void testCassandraBatchIngest() { List books = BookSampler.getBookList(5); List> ingestBooks = new ArrayList<>(); for (Book b : books) { - List l = new ArrayList<>(); l.add(b.getIsbn()); l.add(b.getTitle()); @@ -257,8 +164,82 @@ public class CassandraMessageHandlerTests { Select select = QueryBuilder.select().all().from("book"); books = this.template.select(select, Book.class); - assertEquals(5, books.size()); + assertThat(books).hasSize(5); - this.template.delete(books); + this.template.batchOps().delete(books); } + + @Configuration + @EnableIntegration + public static class Config extends IntegrationTestConfig { + + @Autowired + public ReactiveCassandraOperations template; + + @Override + public String[] getEntityBasePackages() { + return new String[] { Book.class.getPackage().getName() }; + } + + @Bean + public MessageHandler cassandraMessageHandler1() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + cassandraMessageHandler.setAsync(false); + return cassandraMessageHandler; + } + + @Bean + public PollableChannel messageChannel() { + return new NullChannel(); + } + + @Bean + public MessageHandler cassandraMessageHandler2() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + + WriteOptions options = + InsertOptions.builder() + .ttl(60) + .consistencyLevel(ConsistencyLevel.ONE) + .build(); + + cassandraMessageHandler.setWriteOptions(options); + cassandraMessageHandler.setOutputChannel(messageChannel()); + cassandraMessageHandler.setAsync(false); + return cassandraMessageHandler; + } + + @Bean + public MessageHandler cassandraMessageHandler3() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + String cqlIngest = + "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"; + cassandraMessageHandler.setIngestQuery(cqlIngest); + cassandraMessageHandler.setAsync(false); + return cassandraMessageHandler; + } + + @Bean + public FluxMessageChannel resultChannel() { + return new FluxMessageChannel(); + } + + @Bean + public MessageHandler cassandraMessageHandler4() { + CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); + cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size"); + + Map params = new HashMap<>(); + params.put("author", PARSER.parseExpression("payload")); + params.put("size", PARSER.parseExpression("headers.limit")); + + cassandraMessageHandler.setParameterExpressions(params); + + cassandraMessageHandler.setOutputChannel(resultChannel()); + cassandraMessageHandler.setProducesReply(true); + return cassandraMessageHandler; + } + + } + } diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/Book.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/Book.java index ab1e5be..6181ea4 100644 --- a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/Book.java +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/Book.java @@ -1,11 +1,11 @@ /* - * Copyright 2015 the original author or authors + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,14 +17,18 @@ package org.springframework.integration.cassandra.test.domain; import java.util.Date; +import java.util.Objects; + +import org.springframework.data.cassandra.core.mapping.Indexed; +import org.springframework.data.cassandra.core.mapping.PrimaryKey; +import org.springframework.data.cassandra.core.mapping.Table; -import org.springframework.data.cassandra.mapping.Indexed; -import org.springframework.data.cassandra.mapping.PrimaryKey; -import org.springframework.data.cassandra.mapping.Table; /** * Test POJO + * * @author David Webb + * @author Artem Bilan */ @Table("book") public class Book { @@ -47,14 +51,14 @@ public class Book { * @return Returns the isbn. */ public String getIsbn() { - return isbn; + return this.isbn; } /** * @return Returns the saleDate. */ public Date getSaleDate() { - return saleDate; + return this.saleDate; } /** @@ -68,7 +72,7 @@ public class Book { * @return Returns the isInStock. */ public boolean isInStock() { - return isInStock; + return this.isInStock; } /** @@ -89,7 +93,7 @@ public class Book { * @return Returns the title. */ public String getTitle() { - return title; + return this.title; } /** @@ -103,7 +107,7 @@ public class Book { * @return Returns the author. */ public String getAuthor() { - return author; + return this.author; } /** @@ -117,7 +121,7 @@ public class Book { * @return Returns the pages. */ public int getPages() { - return pages; + return this.pages; } /** @@ -132,8 +136,30 @@ public class Book { */ @Override public String toString() { - return ("isbn -> " + isbn) + "\n" + "tile -> " + title + "\n" + "author -> " + author - + "\n" + "pages -> " + pages + "\n"; + return ("isbn -> " + this.isbn) + "\n" + "tile -> " + this.title + "\n" + "author -> " + this.author + + "\n" + "pages -> " + this.pages + "\n"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Book book = (Book) o; + return this.pages == book.pages && + this.isInStock == book.isInStock && + Objects.equals(this.isbn, book.isbn) && + Objects.equals(this.title, book.title) && + Objects.equals(this.author, book.author) && + Objects.equals(this.saleDate, book.saleDate); + } + + @Override + public int hashCode() { + return Objects.hash(this.isbn, this.title, this.author, this.pages, this.saleDate, this.isInStock); } } diff --git a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/BookSampler.java b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/BookSampler.java index b7c031a..b98aa88 100644 --- a/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/BookSampler.java +++ b/spring-integration-cassandra/src/test/java/org/springframework/integration/cassandra/test/domain/BookSampler.java @@ -1,11 +1,11 @@ /* - * Copyright 2016 the original author or authors + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,16 +23,14 @@ import java.util.UUID; /** * @author Filippo Balicchia + * @author Artem Bilan */ -public class BookSampler { +public final class BookSampler { public static List getBookList(int numBooks) { - List books = new ArrayList<>(); - - Book b; - for (int i = 0; i < numBooks; i++) { - b = new Book(); + for (int i = 0; i < numBooks - 1; i++) { + Book b = new Book(); b.setIsbn(UUID.randomUUID().toString()); b.setTitle("Spring Data Cassandra Guide"); b.setAuthor("Cassandra Guru puppy"); @@ -41,6 +39,7 @@ public class BookSampler { b.setSaleDate(new Date()); books.add(b); } + books.add(getBook()); return books; } @@ -55,4 +54,7 @@ public class BookSampler { return b1; } + private BookSampler() { + } + } diff --git a/spring-integration-cassandra/src/test/resources/log4j.properties b/spring-integration-cassandra/src/test/resources/log4j.properties deleted file mode 100644 index 51d07f4..0000000 --- a/spring-integration-cassandra/src/test/resources/log4j.properties +++ /dev/null @@ -1,8 +0,0 @@ -log4j.rootCategory=WARN, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss.SSS} %-5p [%t][%c] %m%n - -log4j.category.org.springframework.integration=WARN -log4j.category.org.springframework.integration.cassandra=INFO diff --git a/spring-integration-cassandra/src/test/resources/logback.xml b/spring-integration-cassandra/src/test/resources/logback.xml new file mode 100644 index 0000000..2ed1c28 --- /dev/null +++ b/spring-integration-cassandra/src/test/resources/logback.xml @@ -0,0 +1,32 @@ + + + + + + %d %5p | %t | %-55logger{55} | %m | %n + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-cassandra/src/test/resources/spring-cassandra.yaml b/spring-integration-cassandra/src/test/resources/spring-cassandra.yaml index e2509fd..7422cd0 100644 --- a/spring-integration-cassandra/src/test/resources/spring-cassandra.yaml +++ b/spring-integration-cassandra/src/test/resources/spring-cassandra.yaml @@ -1,6 +1,5 @@ # Cassandra storage config YAML -# NOTE: # See http://wiki.apache.org/cassandra/StorageConfiguration for # full explanations of configuration directives # /NOTE @@ -37,7 +36,8 @@ cluster_name: 'Test Cluster' initial_token: # See http://wiki.apache.org/cassandra/HintedHandoff -hinted_handoff_enabled: true +hinted_handoff_enabled: false + # this defines the maximum amount of time a dead host will have hints # generated. After it has been dead this long, new hints for it will not be # created until it has been seen alive and gone down again. @@ -102,11 +102,37 @@ authorizer: org.apache.cassandra.auth.AllowAllAuthorizer # partitioners and token selection. partitioner: org.apache.cassandra.dht.Murmur3Partitioner +# Directories where Cassandra should store data on disk. Cassandra +# will spread data evenly across them, subject to the granularity of +# the configured compaction strategy. +# If not set, the default directory is $CASSANDRA_HOME/data/data. +# data_file_directories: +# - /var/lib/cassandra/data + +# commit log. when running on magnetic HDD, this should be a +# separate spindle than the data directories. +# If not set, the default directory is $CASSANDRA_HOME/data/commitlog. +# commitlog_directory: /var/lib/cassandra/commitlog + +# Enable / disable CDC functionality on a per-node basis. This modifies the logic used +# for write path allocation rejection (standard: never reject. cdc: reject Mutation +# containing a CDC-enabled table if at space limit in cdc_raw_directory). +cdc_enabled: false + +# CommitLogSegments are moved to this directory on flush if cdc_enabled: true and the +# segment contains mutations for a CDC-enabled table. This should be placed on a +# separate spindle than the data directories. If not set, the default directory is +# $CASSANDRA_HOME/data/cdc_raw. +cdc_raw_directory: build/embeddedCassandra/data/cdc_raw + + # Directories where Cassandra should store data on disk. Cassandra # will spread data evenly across them, subject to the granularity of # the configured compaction strategy. data_file_directories: - - build/embeddedCassandra/data + - build/embeddedCassandra/data + +hints_directory: build/embeddedCassandra/hints # commit log commitlog_directory: build/embeddedCassandra/commitlog @@ -186,7 +212,7 @@ saved_caches_directory: build/embeddedCassandra/saved_caches # and the CommitLog is simply synced every commitlog_sync_period_in_ms # milliseconds. commitlog_sync: periodic -commitlog_sync_period_in_ms: 10000 +commitlog_sync_period_in_ms: 5000 # The size of the individual commitlog file segments. A commitlog # segment may be archived, deleted, or recycled once all the data @@ -197,20 +223,20 @@ commitlog_sync_period_in_ms: 10000 # archiving commitlog segments (see commitlog_archiving.properties), # then you probably want a finer granularity of archiving; 8 or 16 MB # is reasonable. -commitlog_segment_size_in_mb: 32 +commitlog_segment_size_in_mb: 8 # any class that implements the SeedProvider interface and has a # constructor that takes a Map of parameters will do. seed_provider: - # Addresses of hosts that are deemed contact points. - # Cassandra nodes use this list of hosts to find each other and learn - # the topology of the ring. You must change this if you are running - # multiple nodes! - - class_name: org.apache.cassandra.locator.SimpleSeedProvider - parameters: - # seeds is actually a comma-delimited list of addresses. - # Ex: ",," - - seeds: "127.0.0.1" +# Addresses of hosts that are deemed contact points. +# Cassandra nodes use this list of hosts to find each other and learn +# the topology of the ring. You must change this if you are running +# multiple nodes! +- class_name: org.apache.cassandra.locator.SimpleSeedProvider + parameters: + # seeds is actually a comma-delimited list of addresses. + # Ex: ",," + - seeds: "127.0.0.1" # For workloads with more data than can fit in memory, Cassandra's # bottleneck will be reads that need to fetch data from @@ -221,8 +247,8 @@ seed_provider: # On the other hand, since writes are almost never IO bound, the ideal # number of "concurrent_writes" is dependent on the number of cores in # your system; (8 * number_of_cores) is a good rule of thumb. -concurrent_reads: 32 -concurrent_writes: 32 +concurrent_reads: 4 +concurrent_writes: 4 # Total memory to use for memtables. Cassandra will flush the largest # memtable when this much memory is used. @@ -237,7 +263,7 @@ concurrent_writes: 32 # segment multiple), Cassandra will flush every dirty CF in the oldest # segment and remove it. So a small total commitlog space will tend # to cause more flush activity on less-active columnfamilies. -# commitlog_total_space_in_mb: 4096 +commitlog_total_space_in_mb: 4096 # This sets the amount of memtable flush writer threads. These will # be blocked by disk io, and each one will hold a memtable in memory @@ -295,10 +321,10 @@ native_transport_port: 9043 # transport is used. They are similar to rpc_min_threads and rpc_max_threads, # though the defaults differ slightly. # native_transport_min_threads: 16 -# native_transport_max_threads: 128 +#native_transport_max_threads: 48 # Whether to start the thrift rpc server. -start_rpc: true +start_rpc: false # The address to bind the Thrift RPC service to -- clients connect # here. Unlike ListenAddress above, you _can_ specify 0.0.0.0 here if @@ -385,7 +411,7 @@ snapshot_before_compaction: false # or dropping of column families. The STRONGLY advised default of true # should be used to provide data safety. If you set this flag to false, you will # lose data on truncation or drop. -auto_snapshot: true +auto_snapshot: false # Add column indexes to a row after its contents reach this size. # Increase if your column values are large, or if you have a very large @@ -425,17 +451,17 @@ compaction_throughput_mb_per_sec: 16 # stream_throughput_outbound_megabits_per_sec: 200 # How long the coordinator should wait for read operations to complete -read_request_timeout_in_ms: 10000 +read_request_timeout_in_ms: 120000 # How long the coordinator should wait for seq or index scans to complete -range_request_timeout_in_ms: 10000 +range_request_timeout_in_ms: 120000 # How long the coordinator should wait for writes to complete -write_request_timeout_in_ms: 10000 +write_request_timeout_in_ms: 120000 # How long the coordinator should wait for truncates to complete # (This can be much longer, because unless auto_snapshot is disabled # we need to flush first so we can snapshot before removing the data.) -truncate_request_timeout_in_ms: 60000 +truncate_request_timeout_in_ms: 120000 # The default timeout for other, miscellaneous operations -request_timeout_in_ms: 10000 +request_timeout_in_ms: 120000 # Enable operation timeout information exchange between nodes to accurately # measure request timeouts, If disabled cassandra will assuming the request @@ -572,7 +598,7 @@ request_scheduler: org.apache.cassandra.scheduler.NoScheduler # offs. This value is not often changed, however if you have many # very small rows (many to an OS page), then increasing this will # often lower memory usage without a impact on performance. -index_interval: 128 +# index_interval: 128 # Enable or disable inter-node encryption # Default settings are TLS v1, RSA 1024-bit keys (it is imperative that @@ -621,7 +647,7 @@ client_encryption_options: # can be: all - all traffic is compressed # dc - traffic between different datacenters is compressed # none - nothing is compressed. -internode_compression: all +internode_compression: none # Enable or disable tcp_nodelay for inter-dc communication. # Disabling it will result in larger (but fewer) network packets being sent,