GH-199: Upgrade SI-Cassandra to latest deps

Fixes https://github.com/spring-projects/spring-integration-extensions/issues/199

* Upgrade to SI 5.1.x ane SD-Cassandra-2.1.x, Gradle 5.1
* Add Checkstyle to the project
* Make `CassandraMessageHandler` based on the `ReactiveCassandraOperations`
and add reactive capabilities to the handler
* Make return type consistent for all the modes
* Populate `writeOptions` consistently for all the modes
* Migrate tests to AssertJ
This commit is contained in:
Artem Bilan
2019-01-08 12:09:09 -05:00
parent 525cef2b90
commit 6a9af8fc8a
34 changed files with 1298 additions and 867 deletions

View File

@@ -10,3 +10,4 @@ target
/*.ipr
/*.iws
/bin/
.toDelete

View File

@@ -0,0 +1,44 @@
= Contributor Code of Conduct
As contributors and maintainers of this project, and in the interest of fostering an open
and welcoming community, we pledge to respect all people who contribute through reporting
issues, posting feature requests, updating documentation, submitting pull requests or
patches, and other activities.
We are committed to making participation in this project a harassment-free experience for
everyone, regardless of level of experience, gender, gender identity and expression,
sexual orientation, disability, personal appearance, body size, race, ethnicity, age,
religion, or nationality.
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery
* Personal attacks
* Trolling or insulting/derogatory comments
* Public or private harassment
* Publishing other's private information, such as physical or electronic addresses,
without explicit permission
* Other unethical or unprofessional conduct
Project maintainers have the right and responsibility to remove, edit, or reject comments,
commits, code, wiki edits, issues, and other contributions that are not aligned to this
Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors
that they deem inappropriate, threatening, offensive, or harmful.
By adopting this Code of Conduct, project maintainers commit themselves to fairly and
consistently applying these principles to every aspect of managing this project. Project
maintainers who do not follow or enforce the Code of Conduct may be permanently removed
from the project team.
This Code of Conduct applies both within project spaces and in public spaces when an
individual is representing the project or its community.
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by
contacting a project maintainer at spring-code-of-conduct@pivotal.io . All complaints will
be reviewed and investigated and will result in a response that is deemed necessary and
appropriate to the circumstances. Maintainers are obligated to maintain confidentiality
with regard to the reporter of an incident.
This Code of Conduct is adapted from the
http://contributor-covenant.org[Contributor Covenant], version 1.3.0, available at
http://contributor-covenant.org/version/1/3/0/[contributor-covenant.org/version/1/3/0/]

View File

@@ -1,7 +1,7 @@
Spring Integration Cassandra Adapter
=================================================
Welcome to the *Spring Integration Cassandra adapter*.
Welcome to the *Spring Integration Cassandra* extension.
Checking out and building
-----------------------------

View File

@@ -1,261 +1,257 @@
description = 'Spring Integration Cassandra Support'
apply plugin: 'java'
apply from: "${rootProject.projectDir}/publish-maven.gradle"
apply plugin: 'eclipse'
apply plugin: 'idea'
buildscript {
repositories {
maven { url 'http://repo.spring.io/plugins-release' }
}
dependencies {
classpath 'org.springframework.build.gradle:spring-io-plugin:0.0.3.RELEASE'
}
repositories {
maven { url 'https://repo.spring.io/plugins-release' }
}
}
plugins {
id 'java'
id 'eclipse'
id 'idea'
id 'jacoco'
id 'org.sonarqube' version '2.6.2'
id 'checkstyle'
}
apply from: "${rootProject.projectDir}/publish-maven.gradle"
description = 'Spring Integration Cassandra Support'
group = 'org.springframework.integration'
repositories {
if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) {
maven { url 'http://repo.spring.io/libs-snapshot' }
}
maven { url 'http://repo.spring.io/libs-milestone' }
if (version.endsWith('BUILD-SNAPSHOT')) {
maven { url 'http://repo.spring.io/libs-snapshot' }
}
maven { url 'http://repo.spring.io/libs-milestone' }
// maven { url 'http://repo.spring.io/libs-staging-local' }
}
if (project.hasProperty('platformVersion')) {
apply plugin: 'spring-io'
ext {
assertjVersion = '3.11.1'
cassandraUnitVersion = '3.5.0.1'
slf4jVersion = '1.7.25'
reactorVersion = '3.2.4.RELEASE'
springDataCassandraVersion = '2.1.3.RELEASE'
springIntegrationVersion = '5.1.1.RELEASE'
idPrefix = 'cassandra'
linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions'
linkCi = 'https://build.spring.io/browse/INTEXT'
linkIssue = 'https://jira.spring.io/browse/INTEXT'
linkScmUrl = 'https://github.com/spring-projects/spring-integration-extensions'
linkScmConnection = 'https://github.com/spring-projects/spring-integration-extensions.git'
linkScmDevConnection = 'git@github.com:spring-projects/spring-integration-extensions.git'
dependencies {
springIoVersions "io.spring.platform:platform-bom:${platformVersion}@properties"
}
}
sourceCompatibility = targetCompatibility = 1.7
ext {
cassandraVersion = '2.1.5'
cassandraUnitVersion = '2.1.3.1'
jacocoVersion = '0.7.2.201409121644'
slf4jVersion = '1.7.12'
springDataCassandraVersion = '1.3.0.RELEASE'
springIntegrationVersion = '4.2.4.RELEASE'
idPrefix = 'cassandra'
linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions'
linkCi = 'https://build.spring.io/browse/INTEXT'
linkIssue = 'https://jira.spring.io/browse/INTEXT'
linkScmUrl = 'https://github.com/spring-projects/spring-integration-extensions'
linkScmConnection = 'https://github.com/spring-projects/spring-integration-extensions.git'
linkScmDevConnection = 'git@github.com:spring-projects/spring-integration-extensions.git'
compileJava {
sourceCompatibility = 1.8
targetCompatibility = 1.8
}
eclipse.project.natures += 'org.springframework.ide.eclipse.core.springnature'
sourceSets {
test {
resources {
srcDirs = ['src/test/resources', 'src/test/java']
}
}
test {
resources {
srcDirs = ['src/test/resources', 'src/test/java']
}
}
}
// See http://www.gradle.org/docs/current/userguide/dependency_management.html#sub:configurations
// and http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ConfigurationContainer.html
configurations {
jacoco //Configuration Group used by Sonar to provide Code Coverage using JaCoCo
jacoco {
toolVersion = "0.8.2"
}
checkstyle {
configFile = file("${rootDir}/src/checkstyle/checkstyle.xml")
toolVersion = "8.16"
}
dependencies {
compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion"
compile ("org.springframework.data:spring-data-cassandra:$springDataCassandraVersion") {
exclude group: 'org.liquibase', module: 'liquibase-core'
}
compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion"
compile "org.springframework.data:spring-data-cassandra:$springDataCassandraVersion"
testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion"
testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion"
testCompile("org.cassandraunit:cassandra-unit-spring:$cassandraUnitVersion") {
exclude group: 'org.apache.cassandra', module: 'cassandra-all'
exclude group: 'com.datastax.cassandra', module: 'cassandra-driver-core'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
testCompile ("org.apache.cassandra:cassandra-all:$cassandraVersion") {
exclude group: 'ch.qos.logback', module: 'logback-classic'
exclude group: 'ch.qos.logback', module: 'logback-core'
}
testCompile "org.assertj:assertj-core:$assertjVersion"
testCompile "org.cassandraunit:cassandra-unit-spring:$cassandraUnitVersion"
testCompile "io.projectreactor:reactor-test:$reactorVersion"
testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion"
jacoco "org.jacoco:org.jacoco.agent:$jacocoVersion:runtime"
testRuntime "org.slf4j:jcl-over-slf4j:$slf4jVersion"
}
// enable all compiler warnings; individual projects may customize further
[compileJava, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options']
test {
// suppress all console output during testing unless running `gradle -i`
logging.captureStandardOutput(LogLevel.INFO)
jvmArgs "-javaagent:${configurations.jacoco.asPath}=destfile=${buildDir}/jacoco.exec,includes=*"
// suppress all console output during testing unless running `gradle -i`
logging.captureStandardOutput(LogLevel.INFO)
jacoco {
append = false
destinationFile = file("$buildDir/jacoco.exec")
}
}
jacocoTestReport {
reports {
xml.enabled false
csv.enabled false
html.destination file("${buildDir}/reports/jacoco/html")
}
}
check.dependsOn javadoc
build.dependsOn jacocoTestReport
task sourcesJar(type: Jar) {
classifier = 'sources'
from sourceSets.main.allJava
classifier = 'sources'
from sourceSets.main.allJava
}
task javadocJar(type: Jar) {
classifier = 'javadoc'
from javadoc
classifier = 'javadoc'
from javadoc
}
artifacts {
archives sourcesJar
archives javadocJar
}
apply plugin: 'sonar-runner'
sonarRunner {
sonarProperties {
property "sonar.jacoco.reportPath", "${buildDir.name}/jacoco.exec"
property "sonar.links.homepage", linkHomepage
property "sonar.links.ci", linkCi
property "sonar.links.issue", linkIssue
property "sonar.links.scm", linkScmUrl
property "sonar.links.scm_dev", linkScmDevConnection
property "sonar.java.coveragePlugin", "jacoco"
}
sonarqube {
properties {
property "sonar.jacoco.reportPath", "${buildDir.name}/jacoco.exec"
property "sonar.links.homepage", linkHomepage
property "sonar.links.ci", linkCi
property "sonar.links.issue", linkIssue
property "sonar.links.scm", linkScmUrl
property "sonar.links.scm_dev", linkScmDevConnection
property "sonar.java.coveragePlugin", "jacoco"
}
}
task api(type: Javadoc) {
group = 'Documentation'
description = 'Generates the Javadoc API documentation.'
title = "${rootProject.description} ${version} API"
options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED
options.author = true
options.header = rootProject.description
options.overview = 'src/api/overview.html'
group = 'Documentation'
description = 'Generates the Javadoc API documentation.'
title = "${rootProject.description} ${version} API"
options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED
options.author = true
options.header = rootProject.description
options.overview = 'src/api/overview.html'
source = sourceSets.main.allJava
classpath = project.sourceSets.main.compileClasspath
destinationDir = new File(buildDir, "api")
source = sourceSets.main.allJava
classpath = project.sourceSets.main.compileClasspath
destinationDir = new File(buildDir, "api")
}
task schemaZip(type: Zip) {
group = 'Distribution'
classifier = 'schema'
description = "Builds -${classifier} archive containing all " +
"XSDs for deployment at static.springframework.org/schema."
group = 'Distribution'
classifier = 'schema'
description = "Builds -${classifier} archive containing all " +
"XSDs for deployment at static.springframework.org/schema."
def Properties schemas = new Properties();
def shortName = idPrefix.replaceFirst("${idPrefix}-", '')
def Properties schemas = new Properties();
def shortName = idPrefix.replaceFirst("${idPrefix}-", '')
project.sourceSets.main.resources.find {
it.path.endsWith("META-INF${File.separator}spring.schemas")
}?.withInputStream { schemas.load(it) }
project.sourceSets.main.resources.find {
it.path.endsWith("META-INF${File.separator}spring.schemas")
}?.withInputStream { schemas.load(it) }
for (def key : schemas.keySet()) {
File xsdFile = project.sourceSets.main.resources.find {
it.path.replaceAll('\\\\', '/').endsWith(schemas.get(key))
}
assert xsdFile != null
into("integration/${shortName}") {
from xsdFile.path
}
}
for (def key : schemas.keySet()) {
File xsdFile = project.sourceSets.main.resources.find {
it.path.replaceAll('\\\\', '/').endsWith(schemas.get(key))
}
assert xsdFile != null
into("integration/${shortName}") {
from xsdFile.path
}
}
}
task docsZip(type: Zip) {
group = 'Distribution'
classifier = 'docs'
description = "Builds -${classifier} archive containing api " +
"for deployment at static.spring.io/spring-integration/docs."
group = 'Distribution'
classifier = 'docs'
description = "Builds -${classifier} archive containing api " +
"for deployment at static.spring.io/spring-integration/docs."
from('src/dist') {
include 'changelog.txt'
}
from('src/dist') {
include 'changelog.txt'
}
from(api) {
into 'api'
}
from(api) {
into 'api'
}
}
task distZip(type: Zip, dependsOn: [docsZip, schemaZip]) {
group = 'Distribution'
classifier = 'dist'
description = "Builds -${classifier} archive, containing all jars and docs, " +
"suitable for community download page."
group = 'Distribution'
classifier = 'dist'
description = "Builds -${classifier} archive, containing all jars and docs, " +
"suitable for community download page."
ext.baseDir = "${project.name}-${project.version}";
ext.baseDir = "${project.name}-${project.version}";
from('src/dist') {
include 'readme.txt'
include 'license.txt'
include 'notice.txt'
into "${baseDir}"
}
from('src/dist') {
include 'readme.txt'
include 'license.txt'
into "${baseDir}"
}
from(zipTree(docsZip.archivePath)) {
into "${baseDir}/docs"
}
from(zipTree(docsZip.archivePath)) {
into "${baseDir}/docs"
}
from(zipTree(schemaZip.archivePath)) {
into "${baseDir}/schema"
}
from(zipTree(schemaZip.archivePath)) {
into "${baseDir}/schema"
}
into("${baseDir}/libs") {
from project.jar
from project.sourcesJar
from project.javadocJar
}
into("${baseDir}/libs") {
from project.jar
from project.sourcesJar
from project.javadocJar
}
}
// Create an optional "with dependencies" distribution.
// Not published by default; only for use when building from source.
task depsZip(type: Zip, dependsOn: distZip) { zipTask ->
group = 'Distribution'
classifier = 'dist-with-deps'
description = "Builds -${classifier} archive, containing everything " +
"in the -${distZip.classifier} archive plus all dependencies."
group = 'Distribution'
classifier = 'dist-with-deps'
description = "Builds -${classifier} archive, containing everything " +
"in the -${distZip.classifier} archive plus all dependencies."
from zipTree(distZip.archivePath)
from zipTree(distZip.archivePath)
gradle.taskGraph.whenReady { taskGraph ->
if (taskGraph.hasTask(":${zipTask.name}")) {
def projectName = rootProject.name
def artifacts = new HashSet()
gradle.taskGraph.whenReady { taskGraph ->
if (taskGraph.hasTask(":${zipTask.name}")) {
def projectName = rootProject.name
def artifacts = new HashSet()
rootProject.configurations.runtime.resolvedConfiguration.resolvedArtifacts.each { artifact ->
def dependency = artifact.moduleVersion.id
if (!projectName.equals(dependency.name)) {
artifacts << artifact.file
}
}
rootProject.configurations.runtime.resolvedConfiguration.resolvedArtifacts.each { artifact ->
def dependency = artifact.moduleVersion.id
if (!projectName.equals(dependency.name)) {
artifacts << artifact.file
}
}
zipTask.from(artifacts) {
into "${distZip.baseDir}/deps"
}
}
}
zipTask.from(artifacts) {
into "${distZip.baseDir}/deps"
}
}
}
}
artifacts {
archives distZip
archives docsZip
archives schemaZip
archives sourcesJar
archives javadocJar
archives distZip
archives docsZip
archives schemaZip
}
task dist(dependsOn: assemble) {
group = 'Distribution'
description = 'Builds -dist, -docs and -schema distribution archives.'
}
task wrapper(type: Wrapper) {
description = 'Generates gradlew[.bat] scripts'
gradleVersion = '2.3'
distributionUrl = "http://services.gradle.org/distributions/gradle-${gradleVersion}-all.zip"
group = 'Distribution'
description = 'Builds -dist, -docs and -schema distribution archives.'
}

View File

@@ -1 +1 @@
version=0.5.1.BUILD-SNAPSHOT
version=0.6.0.BUILD-SNAPSHOT

View File

@@ -1,6 +1,5 @@
#Thu Apr 09 10:04:06 EEST 2015
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=http\://services.gradle.org/distributions/gradle-2.3-all.zip

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/usr/bin/env sh
##############################################################################
##
@@ -6,47 +6,6 @@
##
##############################################################################
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn ( ) {
echo "$*"
}
die ( ) {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
esac
# For Cygwin, ensure paths are in UNIX format before anything is touched.
if $cygwin ; then
[ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
fi
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
@@ -61,9 +20,49 @@ while [ -h "$PRG" ] ; do
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >&-
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >&-
cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn () {
echo "$*"
}
die () {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
@@ -90,7 +89,7 @@ location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
@@ -114,6 +113,7 @@ fi
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
@@ -154,11 +154,19 @@ if $cygwin ; then
esac
fi
# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
function splitJvmOpts() {
JVM_OPTS=("$@")
# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
APP_ARGS=$(save "$@")
exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
cd "$(dirname "$0")"
fi
exec "$JAVACMD" "$@"

View File

@@ -8,14 +8,14 @@
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
@@ -46,10 +46,9 @@ echo location of your Java installation.
goto fail
:init
@rem Get command-line arguments, handling Windowz variants
@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
if "%@eval[2+2]" == "4" goto 4NT_args
:win9xME_args
@rem Slurp the command line arguments.
@@ -60,11 +59,6 @@ set _SKIP=2
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
goto execute
:4NT_args
@rem Get arguments from the 4NT Shell from JP Software
set CMD_LINE_ARGS=%$
:execute
@rem Setup the command line

View File

@@ -5,7 +5,7 @@ This document is the API specification for Spring Integration Cassandra Extensio
<div id="overviewBody">
<p>
For further API reference and developer documentation, see the
<a href="http://static.springsource.org/spring-integration/reference" target="_top">Spring
<a href="https://docs.spring.io/spring-integration/reference/html" target="_top">Spring
Integration reference documentation</a>.
That documentation contains more detailed, developer-targeted
descriptions, with conceptual overviews, definitions of terms,
@@ -14,8 +14,8 @@ This document is the API specification for Spring Integration Cassandra Extensio
<p>
If you are interested in commercial training, consultancy, and
support for Spring Integration, please visit <a href="http://www.springsource.com" target="_top">
http://www.springsource.com</a>
support for Spring Integration, please visit
<a href="https://spring.io/" target="_top">https://spring.io/</a>
</p>
</div>
</body>

View File

@@ -0,0 +1,17 @@
^\Q/*\E$
^\Q * Copyright \E20\d\d(\-20\d\d)?\Q the original author or authors.\E$
^\Q *\E$
^\Q * Licensed under the Apache License, Version 2.0 (the "License");\E$
^\Q * you may not use this file except in compliance with the License.\E$
^\Q * You may obtain a copy of the License at\E$
^\Q *\E$
^\Q * http://www.apache.org/licenses/LICENSE-2.0\E$
^\Q *\E$
^\Q * Unless required by applicable law or agreed to in writing, software\E$
^\Q * distributed under the License is distributed on an "AS IS" BASIS,\E$
^\Q * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\E$
^\Q * See the License for the specific language governing permissions and\E$
^\Q * limitations under the License.\E$
^\Q */\E$
^$
^.*$

View File

@@ -0,0 +1,9 @@
<?xml version="1.0"?>
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<suppress files="[\\/]test[\\/]" checks="AvoidStaticImport" />
<suppress files="[\\/]test[\\/]" checks="InnerTypeLast" />
<suppress files="[\\/]test[\\/]" checks="Javadoc*" />
</suppressions>

View File

@@ -0,0 +1,184 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC "-//Puppy Crawl//DTD Check Configuration 1.2//EN" "http://www.puppycrawl.com/dtds/configuration_1_2.dtd">
<module name="Checker">
<module name="SuppressionFilter">
<property name="file" value="src/checkstyle/checkstyle-suppressions.xml" />
</module>
<!-- Root Checks -->
<module name="RegexpHeader">
<property name="headerFile" value="src/checkstyle/checkstyle-header.txt" />
<property name="fileExtensions" value="java" />
</module>
<module name="NewlineAtEndOfFile">
<property name="lineSeparator" value="lf"/>
</module>
<!-- TreeWalker Checks -->
<module name="TreeWalker">
<!-- Annotations -->
<module name="AnnotationUseStyle">
<property name="elementStyle" value="compact" />
</module>
<module name="MissingOverride" />
<module name="PackageAnnotation" />
<module name="AnnotationLocation">
<property name="allowSamelineSingleParameterlessAnnotation"
value="false" />
</module>
<!-- Block Checks -->
<module name="EmptyBlock">
<property name="option" value="text" />
</module>
<module name="LeftCurly" />
<module name="RightCurly">
<property name="option" value="alone" />
</module>
<module name="NeedBraces" />
<module name="AvoidNestedBlocks" />
<!-- Class Design -->
<module name="FinalClass" />
<module name="InterfaceIsType" />
<module name="HideUtilityClassConstructor" />
<module name="MutableException" />
<module name="InnerTypeLast" />
<module name="OneTopLevelClass" />
<!-- Coding -->
<module name="CovariantEquals" />
<module name="EmptyStatement" />
<module name="EqualsHashCode" />
<module name="InnerAssignment" />
<module name="SimplifyBooleanExpression" />
<module name="SimplifyBooleanReturn" />
<module name="StringLiteralEquality" />
<module name="NestedForDepth">
<property name="max" value="3" />
</module>
<module name="NestedIfDepth">
<property name="max" value="4" />
</module>
<module name="NestedTryDepth">
<property name="max" value="3" />
</module>
<module name="MultipleVariableDeclarations" />
<module name="RequireThis">
<property name="validateOnlyOverlapping" value="false" />
<property name="checkMethods" value="false" />
</module>
<module name="OneStatementPerLine" />
<!-- Imports -->
<module name="AvoidStarImport" />
<module name="AvoidStaticImport">
<property name="excludes"
value="org.assertj.core.api.Assertions.*,
org.junit.Assert.*,
org.junit.Assume.*,
org.junit.internal.matchers.ThrowableMessageMatcher.*,
org.hamcrest.CoreMatchers.*,
org.hamcrest.Matchers.*,
org.mockito.Mockito.*,
org.mockito.BDDMockito.*,
org.mockito.Matchers.*" />
</module>
<module name="IllegalImport" />
<module name="RedundantImport" />
<module name="UnusedImports">
<property name="processJavadoc" value="true" />
</module>
<module name="ImportOrder">
<property name="groups" value="java,/^javax?\./,org,org.springframework,*" />
<property name="ordered" value="true" />
<property name="separated" value="true" />
<property name="option" value="top" />
<property name="sortStaticImportsAlphabetically" value="true" />
</module>
<!-- Javadoc Comments -->
<module name="JavadocType">
<property name="scope" value="package"/>
<property name="authorFormat" value=".+\s.+"/>
</module>
<module name="JavadocMethod">
<property name="allowMissingJavadoc" value="true" />
</module>
<module name="JavadocVariable">
<property name="scope" value="public"/>
</module>
<module name="JavadocStyle">
<property name="checkEmptyJavadoc" value="true"/>
</module>
<module name="NonEmptyAtclauseDescription" />
<module name="JavadocTagContinuationIndentation">
<property name="offset" value="0"/>
</module>
<module name="AtclauseOrder">
<property name="target" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF"/>
<property name="tagOrder" value="@param, @author, @since, @see, @version, @serial, @deprecated"/>
</module>
<module name="AtclauseOrder">
<property name="target" value="METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
<property name="tagOrder" value="@param, @return, @throws, @since, @deprecated, @see"/>
</module>
<!-- Miscellaneous -->
<module name="CommentsIndentation">
<property name="tokens" value="BLOCK_COMMENT_BEGIN" />
</module>
<module name="UpperEll" />
<module name="ArrayTypeStyle" />
<module name="OuterTypeFilename" />
<!-- Modifiers -->
<module name="RedundantModifier" />
<!-- Regexp -->
<module name="RegexpSinglelineJava">
<property name="format" value="^\t* +\t*\S" />
<property name="message"
value="Line has leading space characters; indentation should be performed with tabs only." />
<property name="ignoreComments" value="true" />
</module>
<module name="RegexpSinglelineJava">
<property name="maximum" value="0"/>
<property name="format" value="org\.mockito\..*Mockito\.(when|doThrow|doAnswer)" />
<property name="message"
value="Please use BDDMockito instead of Mockito.(when|doThrow|doAnswer)." />
<property name="ignoreComments" value="true" />
</module>
<module name="RegexpSinglelineJava">
<property name="maximum" value="0"/>
<property name="format" value="org\.junit\.Assert\.assert" />
<property name="message"
value="Please use AssertJ imports." />
<property name="ignoreComments" value="true" />
</module>
<module name="Regexp">
<property name="format" value="[ \t]+$" />
<property name="illegalPattern" value="true" />
<property name="message" value="Trailing whitespace" />
</module>
<module name="Regexp">
<property name="format" value="System.(out|err).print" />
<property name="illegalPattern" value="true" />
<property name="message" value="System.out or .err" />
</module>
<!-- Whitespace -->
<module name="GenericWhitespace" />
<module name="MethodParamPad" />
<module name="NoWhitespaceAfter" >
<property name="tokens" value="BNOT, DEC, DOT, INC, LNOT, UNARY_MINUS, UNARY_PLUS, ARRAY_DECLARATOR"/>
</module>
<module name="NoWhitespaceBefore" />
<module name="ParenPad" />
<module name="TypecastParenPad" />
<module name="WhitespaceAfter" />
<module name="WhitespaceAround" />
</module>
</module>

View File

@@ -1,21 +0,0 @@
========================================================================
== NOTICE file corresponding to section 4 d of the Apache License, ==
== Version 2.0, in this case for the Spring Integration distribution. ==
========================================================================
This product includes software developed by
the Apache Software Foundation (http://www.apache.org).
The end-user documentation included with a redistribution, if any,
must include the following acknowledgement:
"This product includes software developed by the Spring Framework
Project (http://www.springframework.org)."
Alternatively, this acknowledgement may appear in the software itself,
if and wherever such third-party acknowledgements normally appear.
The names "Spring", "Spring Framework", and "Spring Integration" must
not be used to endorse or promote products derived from this software
without prior written permission. For written permission, please contact
enquiries@springsource.com.

View File

@@ -1,17 +1,17 @@
/*
* Copyright 2015-2016 the original author or authors
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.cassandra.config.xml;
@@ -19,6 +19,8 @@ package org.springframework.integration.cassandra.config.xml;
import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler;
/**
* The namespace handler for "int-cassandra" namespace.
*
* @author Artem Bilan
* @author Filippo Balicchia
*/

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,7 +25,10 @@ import org.springframework.integration.cassandra.outbound.CassandraMessageHandle
import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser;
/**
* The parser for the {@code <int-cassandra:outbound-channel-adapter>}.
*
* @author Filippo Balicchia
* @author Artem Bilan
*/
public class CassandraOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {

View File

@@ -1,17 +1,17 @@
/*
* Copyright 2016 the original author or authors
* Copyright 2016-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.cassandra.config.xml;
@@ -25,7 +25,10 @@ import org.springframework.integration.config.xml.AbstractConsumerEndpointParser
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
/**
* The parser for the {@code <int-cassandra:outbound-gateway>}.
*
* @author Filippo Balicchia
* @author Artem Bilan
*/
public class CassandraOutboundGatewayParser extends AbstractConsumerEndpointParser {

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -31,12 +31,15 @@ import org.springframework.util.StringUtils;
import org.springframework.util.xml.DomUtils;
/**
* The {@code int-cassandra} namespace XML parser helper.
*
* @author Filippo Balicchia
* @author Artem Bilan
*/
public class CassandraParserUtils {
public final class CassandraParserUtils {
public static void processOutboundTypeAttributes(Element element, ParserContext parserContext,
BeanDefinitionBuilder builder) {
BeanDefinitionBuilder builder) {
String cassandraTemplate = element.getAttribute("cassandra-template");
String mode = element.getAttribute("mode");
@@ -67,18 +70,19 @@ public class CassandraParserUtils {
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "write-options");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ingest-query");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "query");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "async");
List<Element> parameterExpressions = DomUtils.getChildElementsByTagName(element, "parameter-expression");
if (!CollectionUtils.isEmpty(parameterExpressions)) {
ManagedMap<String, Object> parameterExpressionsMap = new ManagedMap<String, Object>();
ManagedMap<String, Object> parameterExpressionsMap = new ManagedMap<>();
for (Element parameterExpressionElement : parameterExpressions) {
String name = parameterExpressionElement.getAttribute(AbstractBeanDefinitionParser.NAME_ATTRIBUTE);
BeanDefinition expression = IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined(
IntegrationNamespaceUtils.EXPRESSION_ATTRIBUTE, parameterExpressionElement);
BeanDefinition expression =
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined(
IntegrationNamespaceUtils.EXPRESSION_ATTRIBUTE, parameterExpressionElement);
if (expression != null) {
parameterExpressionsMap.put(name, expression);
}
}
builder.addPropertyValue("parameterExpressions", parameterExpressionsMap);
}
@@ -86,10 +90,14 @@ public class CassandraParserUtils {
}
public static boolean areMutuallyExclusive(String query, BeanDefinition statementExpressionDef,
String ingestQuery) {
String ingestQuery) {
return StringUtils.isEmpty(query) && statementExpressionDef == null && StringUtils.isEmpty(ingestQuery)
|| !(StringUtils.hasText(query) && statementExpressionDef != null && StringUtils.hasText(ingestQuery))
&& (StringUtils.hasText(query) ^ statementExpressionDef != null) ^ StringUtils.hasText(ingestQuery);
}
private CassandraParserUtils() {
}
}

View File

@@ -1,3 +1,19 @@
/*
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides classes for Cassandra parsers and namespace handlers.
*/

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,20 +16,27 @@
package org.springframework.integration.cassandra.outbound;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.cassandra.core.CachedPreparedStatementCreator;
import org.springframework.cassandra.core.PreparedStatementCreator;
import org.springframework.cassandra.core.WriteOptions;
import org.springframework.data.cassandra.core.CassandraOperations;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.dao.DataAccessException;
import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.ReactiveSession;
import org.springframework.data.cassandra.core.InsertOptions;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import org.springframework.data.cassandra.core.UpdateOptions;
import org.springframework.data.cassandra.core.WriteResult;
import org.springframework.data.cassandra.core.cql.QueryOptionsUtil;
import org.springframework.data.cassandra.core.cql.ReactiveSessionCallback;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.TypeLocator;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeLocator;
import org.springframework.integration.expression.ExpressionEvalMap;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
@@ -37,24 +44,28 @@ import org.springframework.integration.handler.MessageProcessor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* An {@link AbstractReplyProducingMessageHandler} implementation for Cassandra outbound operations.
*
* @author Soby Chacko
* @author Artem Bilan
* @author Filippo Balicchia
*/
@SuppressWarnings("unchecked")
public class CassandraMessageHandler<T> extends AbstractReplyProducingMessageHandler {
public class CassandraMessageHandler extends AbstractReplyProducingMessageHandler {
private final Map<String, Expression> parameterExpressions = new HashMap<>();
private final CassandraOperations cassandraTemplate;
private Type mode;
private final ReactiveCassandraOperations cassandraOperations;
private boolean producesReply;
/**
@@ -65,21 +76,33 @@ public class CassandraMessageHandler<T> extends AbstractReplyProducingMessageHan
/**
* Various options that can be used for Cassandra writes.
*/
private WriteOptions writeOptions;
private WriteOptions writeOptions = WriteOptions.empty();
private MessageProcessor<Statement> statementProcessor;
private ReactiveSessionMessageCallback sessionMessageCallback;
private EvaluationContext evaluationContext;
public CassandraMessageHandler(CassandraOperations cassandraTemplate) {
this(cassandraTemplate, Type.INSERT);
public CassandraMessageHandler(ReactiveCassandraOperations cassandraOperations) {
this(cassandraOperations, Type.INSERT);
}
public CassandraMessageHandler(CassandraOperations cassandraTemplate, CassandraMessageHandler.Type queryType) {
Assert.notNull(cassandraTemplate, "'cassandraTemplate' must not be null.");
public CassandraMessageHandler(ReactiveCassandraOperations cassandraOperations,
CassandraMessageHandler.Type queryType) {
Assert.notNull(cassandraOperations, "'cassandraOperations' must not be null.");
Assert.notNull(queryType, "'queryType' must not be null.");
this.cassandraTemplate = cassandraTemplate;
this.cassandraOperations = cassandraOperations;
this.mode = queryType;
setAsync(true);
switch (this.mode) {
case INSERT:
this.writeOptions = InsertOptions.empty();
break;
case UPDATE:
this.writeOptions = UpdateOptions.empty();
break;
}
}
public void setIngestQuery(String ingestQuery) {
@@ -89,6 +112,7 @@ public class CassandraMessageHandler<T> extends AbstractReplyProducingMessageHan
}
public void setWriteOptions(WriteOptions writeOptions) {
Assert.notNull(writeOptions, "'writeOptions' must not be null");
this.writeOptions = writeOptions;
}
@@ -96,47 +120,32 @@ public class CassandraMessageHandler<T> extends AbstractReplyProducingMessageHan
this.producesReply = producesReply;
}
public void setStatementExpressionString(String statementExpression) {
setStatementExpression(EXPRESSION_PARSER.parseExpression(statementExpression));
}
public void setStatementExpression(Expression statementExpression) {
setStatementProcessor(new ExpressionEvaluatingMessageProcessor<Statement>(statementExpression,
Statement.class) {
setStatementProcessor(
new ExpressionEvaluatingMessageProcessor<Statement>(statementExpression, Statement.class) {
@Override
protected StandardEvaluationContext getEvaluationContext() {
return (StandardEvaluationContext) CassandraMessageHandler.this.evaluationContext;
}
@Override
protected StandardEvaluationContext getEvaluationContext() {
return (StandardEvaluationContext) CassandraMessageHandler.this.evaluationContext;
}
});
});
}
public void setQuery(String query) {
Assert.hasText(query, "'query' must not be empty");
final PreparedStatementCreator statementCreator = new CachedPreparedStatementCreator(query);
setStatementProcessor(new MessageProcessor<Statement>() {
@Override
public Statement processMessage(Message<?> message) {
PreparedStatement preparedStatement =
statementCreator.createPreparedStatement(cassandraTemplate.getSession());
ColumnDefinitions variables = preparedStatement.getVariables();
List<Object> values = new ArrayList<>(variables.size());
Map<String, Object> valueMap = new HashMap<>(variables.size());
for (ColumnDefinitions.Definition definition : variables) {
String name = definition.getName();
Object value = valueMap.get(name);
if (value == null) {
Expression expression = parameterExpressions.get(name);
Assert.state(expression != null, "No expression for parameter: " + name);
value = expression.getValue(evaluationContext, message);
valueMap.put(name, value);
}
values.add(value);
}
return preparedStatement.bind(values.toArray());
}
});
this.sessionMessageCallback =
(session, requestMessage) ->
session.execute(query,
ExpressionEvalMap.from(this.parameterExpressions)
.usingEvaluationContext(this.evaluationContext)
.withRoot(requestMessage)
.build());
this.mode = Type.STATEMENT;
}
public void setParameterExpressions(Map<String, Expression> parameterExpressions) {
@@ -147,7 +156,11 @@ public class CassandraMessageHandler<T> extends AbstractReplyProducingMessageHan
public void setStatementProcessor(MessageProcessor<Statement> statementProcessor) {
Assert.notNull(statementProcessor, "'statementProcessor' must not be null.");
this.statementProcessor = statementProcessor;
this.sessionMessageCallback =
(session, requestMessage) ->
session.execute(
QueryOptionsUtil.addQueryOptions(statementProcessor.processMessage(requestMessage),
this.writeOptions));
this.mode = Type.STATEMENT;
}
@@ -174,83 +187,156 @@ public class CassandraMessageHandler<T> extends AbstractReplyProducingMessageHan
protected Object handleRequestMessage(Message<?> requestMessage) {
Object payload = requestMessage.getPayload();
Object result = payload;
Mono<? extends WriteResult> result = null;
Type mode = this.mode;
Statement statement = null;
if (payload instanceof Statement) {
statement = (Statement) payload;
mode = Type.STATEMENT;
}
switch (mode) {
case INSERT:
if (this.ingestQuery != null) {
Assert.isInstanceOf(List.class, payload,
"to perform 'ingest' the 'payload' must be of 'List<List<?>>' type.");
List<?> list = (List<?>) payload;
for (Object o : list) {
Assert.isInstanceOf(List.class, o,
"to perform 'ingest' the 'payload' must be of 'List<List<?>>' type.");
}
List<List<?>> rows = (List<List<?>>) payload;
this.cassandraTemplate.ingest(this.ingestQuery, rows, this.writeOptions);
}
else {
if (payload instanceof List) {
this.cassandraTemplate.insert((List<T>) payload, this.writeOptions);
}
else {
this.cassandraTemplate.insert(payload, this.writeOptions);
}
}
result = handleInsert(payload);
break;
case UPDATE:
if (payload instanceof List) {
this.cassandraTemplate.update((List<T>) payload, this.writeOptions);
}
else {
this.cassandraTemplate.update(payload, this.writeOptions);
}
result = handleUpdate(payload);
break;
case DELETE:
if (payload instanceof List) {
this.cassandraTemplate.delete((List<T>) payload, this.writeOptions);
}
else {
this.cassandraTemplate.delete(payload, this.writeOptions);
}
result = handleDelete(payload);
break;
case STATEMENT:
if (statement == null) {
statement = this.statementProcessor.processMessage(requestMessage);
}
result = this.cassandraTemplate.executeAsynchronously(statement).getUninterruptibly();
result = handleStatement(requestMessage);
break;
}
return this.producesReply ? result : null;
if (this.producesReply) {
return isAsync() ? result : result.block();
}
else {
if (isAsync()) {
result.subscribe();
}
else {
result.block();
}
return null;
}
}
@SuppressWarnings("unchecked")
private Mono<? extends WriteResult> handleInsert(Object payload) {
if (this.ingestQuery != null) {
Assert.isInstanceOf(List.class, payload,
"to perform 'ingest' the 'payload' must be of 'List<List<?>>' type.");
List<?> list = (List<?>) payload;
for (Object o : list) {
Assert.isInstanceOf(List.class, o,
"to perform 'ingest' the 'payload' must be of 'List<List<?>>' type.");
}
List<List<?>> rows = (List<List<?>>) payload;
return this.cassandraOperations.getReactiveCqlOperations()
.execute((ReactiveSessionCallback<WriteResult>) session ->
session.prepare(this.ingestQuery)
.map(s -> QueryOptionsUtil.addPreparedStatementOptions(s, this.writeOptions))
.flatMapMany(s ->
Flux.fromIterable(rows)
.map(row -> s.bind(row.toArray())))
.collect(BatchStatement::new, BatchStatement::add)
.flatMap(session::execute)
.transform(this::transformToWriteResult))
.next();
}
else {
if (payload instanceof List) {
return this.cassandraOperations.batchOps()
.insert((List<?>) payload, this.writeOptions)
.execute();
}
else {
return this.cassandraOperations.insert(payload, (InsertOptions) this.writeOptions);
}
}
}
private Mono<? extends WriteResult> handleUpdate(Object payload) {
if (payload instanceof List) {
return this.cassandraOperations.batchOps()
.update((List<?>) payload, this.writeOptions)
.execute();
}
else {
return this.cassandraOperations.update(payload, (UpdateOptions) this.writeOptions);
}
}
private Mono<WriteResult> handleDelete(Object payload) {
if (payload instanceof List) {
return this.cassandraOperations.batchOps()
.delete((List<?>) payload)
.execute();
}
else {
return this.cassandraOperations.delete(payload, this.writeOptions);
}
}
private Mono<WriteResult> handleStatement(Message<?> requestMessage) {
Object payload = requestMessage.getPayload();
Mono<ReactiveResultSet> resultSetMono;
if (payload instanceof Statement) {
resultSetMono = this.cassandraOperations.getReactiveCqlOperations().queryForResultSet((Statement) payload);
}
else {
resultSetMono = this.cassandraOperations.getReactiveCqlOperations()
.execute((ReactiveSessionCallback<ReactiveResultSet>) session ->
this.sessionMessageCallback.doInSession(session, requestMessage))
.next();
}
return resultSetMono.transform(this::transformToWriteResult);
}
private Mono<WriteResult> transformToWriteResult(Mono<ReactiveResultSet> resultSetMono) {
return resultSetMono
.map(DirectFieldAccessor::new)
.map(accessor -> accessor.getPropertyValue("resultSet"))
.cast(ResultSet.class)
.map(WriteResult::of);
}
/**
* Always return {@code false} to prevent a {@link com.datastax.driver.core.ResultSet}
* draining on iteration.
*
* @param reply ignored.
* @return {@code false}.
* The mode for the {@link CassandraMessageHandler}.
*/
@Override
protected boolean shouldSplitOutput(Iterable<?> reply) {
return false;
}
public enum Type {
INSERT, UPDATE, DELETE, STATEMENT;
/**
* Set a {@link CassandraMessageHandler} into an {@code insert} mode.
*/
INSERT,
/**
* Set a {@link CassandraMessageHandler} into an {@code update} mode.
*/
UPDATE,
/**
* Set a {@link CassandraMessageHandler} into a {@code delete} mode.
*/
DELETE,
/**
* Set a {@link CassandraMessageHandler} into a {@code statement} mode.
*/
STATEMENT;
}
@FunctionalInterface
private interface ReactiveSessionMessageCallback {
Mono<ReactiveResultSet> doInSession(ReactiveSession session, Message<?> requestMessage)
throws DriverException, DataAccessException;
}

View File

@@ -1,4 +1,20 @@
/*
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides classes supporting Cassndra outbound endpoints.
* Provides classes supporting Cassandra outbound endpoints.
*/
package org.springframework.integration.cassandra.outbound;

View File

@@ -1,15 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema
xmlns="http://www.springframework.org/schema/integration/cassandra"
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:tool="http://www.springframework.org/schema/tool"
xmlns:integration="http://www.springframework.org/schema/integration"
targetNamespace="http://www.springframework.org/schema/integration/cassandra"
elementFormDefault="qualified" attributeFormDefault="unqualified">
<xsd:import namespace="http://www.springframework.org/schema/beans" />
<xsd:import namespace="http://www.springframework.org/schema/tool" />
xmlns="http://www.springframework.org/schema/integration/cassandra"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tool="http://www.springframework.org/schema/tool"
xmlns:integration="http://www.springframework.org/schema/integration"
targetNamespace="http://www.springframework.org/schema/integration/cassandra"
elementFormDefault="qualified">
<xsd:import namespace="http://www.springframework.org/schema/beans"/>
<xsd:import namespace="http://www.springframework.org/schema/tool"/>
<xsd:import namespace="http://www.springframework.org/schema/integration"
schemaLocation="http://www.springframework.org/schema/integration/spring-integration.xsd" />
schemaLocation="http://www.springframework.org/schema/integration/spring-integration.xsd"/>
<xsd:annotation>
<xsd:documentation><![CDATA[
@@ -29,20 +29,21 @@
<xsd:complexContent>
<xsd:extension base="outboundType">
<xsd:choice minOccurs="0" maxOccurs="2">
<xsd:element name="parameter-expression" type="queryParameterType" minOccurs="0" maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
Specify an expression for parameter variable placeholder in cql statement.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="parameter-expression" type="queryParameterType" minOccurs="0"
maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
Specify an expression for parameter variable placeholder in cql statement.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:choice>
<xsd:attributeGroup ref="integration:channelAdapterAttributes" />
<xsd:attributeGroup ref="integration:channelAdapterAttributes"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
<xsd:element name="outbound-gateway">
<xsd:annotation>
<xsd:documentation>
@@ -55,23 +56,25 @@
<xsd:complexContent>
<xsd:extension base="outboundType">
<xsd:choice minOccurs="0" maxOccurs="2">
<xsd:element name="parameter-expression" type="queryParameterType" minOccurs="0" maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
Specify an expression for parameter variable placeholder in cql statement.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="parameter-expression" type="queryParameterType" minOccurs="0"
maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
Specify an expression for parameter variable placeholder in cql statement.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:choice>
<xsd:attribute name="reply-channel" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Message Channel to which replies should be sent after being received from Cassandra cluster.
Message Channel to which replies should be sent after being received from Cassandra
cluster.
</xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.messaging.MessageChannel"/>
type="org.springframework.messaging.MessageChannel"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
@@ -79,7 +82,7 @@
<xsd:attribute name="id" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Unique ID for this gateway.
Unique ID for this gateway.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
@@ -91,7 +94,7 @@
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.messaging.MessageChannel"/>
type="org.springframework.messaging.MessageChannel"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
@@ -101,7 +104,7 @@
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
<xsd:complexType name="outboundType">
<xsd:annotation>
<xsd:documentation>
@@ -109,44 +112,43 @@
</xsd:documentation>
</xsd:annotation>
<xsd:sequence>
<xsd:element ref="integration:poller" minOccurs="0" maxOccurs="1"/>
<xsd:element name="request-handler-advice-chain" type="integration:handlerAdviceChainType" minOccurs="0" maxOccurs="1" />
<xsd:element ref="integration:poller" minOccurs="0"/>
<xsd:element name="request-handler-advice-chain" type="integration:handlerAdviceChainType" minOccurs="0"/>
</xsd:sequence>
<xsd:attribute name="cassandra-template" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
Reference to an instance of
org.springframework.data.cassandra.core.CassandraOperations]]>
</xsd:documentation>
<xsd:documentation>
Reference to an instance of
'org.springframework.data.cassandra.core.ReactiveCassandraOperations'.
</xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.data.cassandra.core.CassandraOperations" />
type="org.springframework.data.cassandra.core.ReactiveCassandraOperations"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="write-options" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
Reference to an instance of
org.springframework.cassandra.core.WriteOptions]]>
</xsd:documentation>
<xsd:documentation>
Reference to an instance of
'org.springframework.data.cassandra.core.cql.WriteOptions'
</xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.cassandra.core.WriteOptions" />
type="org.springframework.data.cassandra.core.cql.WriteOptions"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="mode" default="INSERT" use="optional">
<xsd:attribute name="mode" default="INSERT">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
Indicates the `CassandraMessageHandler` behavior. Ignored in case of explicit 'query', 'ingest-query' or `statement-expression`.]]>
<![CDATA[
Indicates the `CassandraMessageHandler` behavior.
Ignored in case of explicit 'query', 'ingest-query' or `statement-expression`.]]>
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
@@ -174,8 +176,22 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="async">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
Process returned 'Mono<WriteResult>' async, reactive manner on the downstream
'FluxMessageChannel' subscription or via 'Mono.subscribe()' in the handler, if one-way.
Otherwise the 'Mono.block()' is called immediately before returning from the handler.
]]>
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string" />
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="queryParameterType">
<xsd:annotation>
<xsd:documentation>
@@ -200,13 +216,13 @@
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:simpleType name="cassandraHandlerType">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="INSERT" />
<xsd:enumeration value="UPDATE" />
<xsd:enumeration value="DELETE" />
<xsd:enumeration value="STATEMENT" />
</xsd:restriction>
<xsd:restriction base="xsd:token">
<xsd:enumeration value="INSERT"/>
<xsd:enumeration value="UPDATE"/>
<xsd:enumeration value="DELETE"/>
<xsd:enumeration value="STATEMENT"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:schema>

View File

@@ -1,87 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:cassandra="http://www.springframework.org/schema/data/cassandra"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-cassandra="http://www.springframework.org/schema/integration/cassandra"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:cassandra="http://www.springframework.org/schema/data/cassandra"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-cassandra="http://www.springframework.org/schema/integration/cassandra"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/data/cassandra http://www.springframework.org/schema/data/cassandra/spring-cassandra.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/cassandra http://www.springframework.org/schema/integration/cassandra/spring-integration-cassandra.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
http://www.springframework.org/schema/integration/cassandra http://www.springframework.org/schema/integration/cassandra/spring-integration-cassandra.xsd">
<context:property-placeholder location="classpath:cassandra.properties" />
<int:poller default="true" fixed-delay="50" />
<context:property-placeholder location="classpath:cassandra.properties"/>
<int:poller default="true" fixed-delay="50"/>
<cassandra:mapping entity-base-packages="org.springframework.integration.cassandra.test.domain">
<cassandra:entity
class="org.springframework.integration.cassandra.test.domain.Book">
<cassandra:table name="book" />
class="org.springframework.integration.cassandra.test.domain.Book">
<cassandra:table name="book"/>
</cassandra:entity>
</cassandra:mapping>
<cassandra:cluster contact-points="${cassandra.contactpoints}" port="${cassandra.port}">
<cassandra:keyspace action="CREATE_DROP" name="${cassandra.keyspace}" />
<cassandra:keyspace action="CREATE_DROP" name="${cassandra.keyspace}"/>
</cassandra:cluster>
<cassandra:session keyspace-name="${cassandra.keyspace}" schema-action="RECREATE"/>
<cassandra:session id="cassandraSession" keyspace-name="${cassandra.keyspace}" schema-action="RECREATE"/>
<cassandra:converter id="cassandraConverter"/>
<cassandra:converter />
<cassandra:template id="cassandraTemplate" />
<int-cassandra:outbound-channel-adapter id="cassandraMessageHandler1"
cassandra-template="cassandraTemplate"
mode="INSERT"
auto-startup="true" />
<bean id="writeOptions" class="org.springframework.cassandra.core.WriteOptions">
<property name="ttl" value="60" />
<property name="consistencyLevel">
<util:constant
static-field="org.springframework.cassandra.core.ConsistencyLevel.ONE" />
</property>
<property name="retryPolicy">
<util:constant
static-field="org.springframework.cassandra.core.RetryPolicy.DOWNGRADING_CONSISTENCY" />
</property>
<bean id="reactiveCassandraTemplate" class="org.springframework.data.cassandra.core.ReactiveCassandraTemplate">
<constructor-arg name="converter" ref="cassandraConverter"/>
<constructor-arg name="session">
<bean class="org.springframework.data.cassandra.core.cql.session.DefaultBridgedReactiveSession">
<constructor-arg ref="cassandraSession"/>
</bean>
</constructor-arg>
</bean>
<bean id="resultChannel" class="org.springframework.integration.channel.QueueChannel"/>
<int-cassandra:outbound-channel-adapter id="cassandraMessageHandler2"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
auto-startup="true" />
<int-cassandra:outbound-channel-adapter id="cassandraMessageHandler3"
cassandra-template="cassandraTemplate"
ingest-query="insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"
auto-startup="true" />
<int-cassandra:outbound-channel-adapter id="cassandraMessageHandler1"
cassandra-template="reactiveCassandraTemplate"
async="false"/>
<int-cassandra:outbound-channel-adapter id="cassandraMessageHandler4"
cassandra-template="cassandraTemplate"
statement-expression="T(QueryBuilder).truncate('book')"
auto-startup="true" />
<int:channel id="inputChannel" />
<int-cassandra:outbound-channel-adapter id="cassandraMessageHandler2"
cassandra-template="reactiveCassandraTemplate"
async="false"/>
<int-cassandra:outbound-channel-adapter id="cassandraMessageHandler3"
cassandra-template="reactiveCassandraTemplate"
ingest-query="insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"
async="false"/>
<int-cassandra:outbound-channel-adapter id="cassandraMessageHandler4"
cassandra-template="reactiveCassandraTemplate"
statement-expression="T(QueryBuilder).truncate('book')"
async="false"/>
<int:channel id="inputChannel"/>
<bean id="resultChannel" class="org.springframework.integration.channel.FluxMessageChannel"/>
<int-cassandra:outbound-gateway id="cassandraMessageHandler5"
request-channel="inputChannel"
cassandra-template="cassandraTemplate"
mode="STATEMENT"
query="SELECT * FROM book limit :size"
reply-channel="resultChannel"
auto-startup="true">
<int-cassandra:parameter-expression name="author" expression="payload" />
<int-cassandra:parameter-expression name="size" expression="headers.limit" />
</int-cassandra:outbound-gateway>
request-channel="inputChannel"
cassandra-template="reactiveCassandraTemplate"
mode="STATEMENT"
query="SELECT * FROM book limit :size"
reply-channel="resultChannel">
<int-cassandra:parameter-expression name="author" expression="payload"/>
<int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
<cassandra:repositories base-package="org.springframework.integration.cassandra.test.domain" />
</beans>

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,10 +16,7 @@
package org.springframework.integration.cassandra.config;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,27 +32,27 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.cassandra.core.CassandraTemplate;
import org.springframework.data.cassandra.core.WriteResult;
import org.springframework.integration.cassandra.test.domain.Book;
import org.springframework.integration.cassandra.test.domain.BookSampler;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
* @author Filippo Balicchia
* @author Artem Bilan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@RunWith(SpringRunner.class)
@DirtiesContext
public class CassandraOutboundAdapterIntegrationTests {
@@ -80,18 +77,12 @@ public class CassandraOutboundAdapterIntegrationTests {
private DirectChannel inputChannel;
@Autowired
private PollableChannel resultChannel;
private FluxMessageChannel resultChannel;
@BeforeClass
public static void init() throws TTransportException, IOException, InterruptedException, ConfigurationException {
startCassandra();
}
private static void startCassandra()
throws TTransportException, IOException, InterruptedException, ConfigurationException {
public static void init() throws TTransportException, IOException, ConfigurationException {
EmbeddedCassandraServerHelper.startEmbeddedCassandra(CASSANDRA_CONFIG, "build/embeddedCassandra");
EmbeddedCassandraServerHelper.getSession();
}
@AfterClass
@@ -100,35 +91,40 @@ public class CassandraOutboundAdapterIntegrationTests {
}
@Test
public void testBasicCassandraInsert() throws Exception {
public void testBasicCassandraInsert() {
Book b1 = BookSampler.getBook();
Message<Book> message = MessageBuilder.withPayload(b1).build();
cassandraMessageHandler1.send(message);
this.cassandraMessageHandler1.send(message);
Select select = QueryBuilder.select().all().from("book");
List<Book> books = cassandraTemplate.select(select, Book.class);
assertEquals(1, books.size());
cassandraTemplate.delete(b1);
List<Book> books = this.cassandraTemplate.select(select, Book.class);
assertThat(books).hasSize(1);
this.cassandraTemplate.delete(b1);
}
@Test
public void testCassandraBatchInsertAndSelectStatement() throws Exception {
public void testCassandraBatchInsertAndSelectStatement() {
List<Book> books = BookSampler.getBookList(5);
cassandraMessageHandler2.send(new GenericMessage<>(books));
this.cassandraMessageHandler2.send(new GenericMessage<>(books));
Message<?> message = MessageBuilder.withPayload("Cassandra Puppy Guru").setHeader("limit", 2).build();
inputChannel.send(message);
Message<?> receive = resultChannel.receive(10000);
assertNotNull(receive);
assertThat(receive.getPayload(), instanceOf(ResultSet.class));
ResultSet resultSet = (ResultSet) receive.getPayload();
assertNotNull(resultSet);
List<Row> rows = resultSet.all();
assertEquals(2, rows.size());
cassandraMessageHandler1.send(new GenericMessage<>(QueryBuilder.truncate("book")));
this.inputChannel.send(message);
Mono<Integer> testMono =
Mono.from(this.resultChannel)
.map(Message::getPayload)
.cast(WriteResult.class)
.map(r -> r.getRows().size());
StepVerifier.create(testMono)
.expectNext(2)
.expectComplete()
.verify();
this.cassandraMessageHandler1.send(new GenericMessage<>(QueryBuilder.truncate("book")));
}
@Test
public void testCassandraBatchIngest() throws Exception {
public void testCassandraBatchIngest() {
List<Book> books = BookSampler.getBookList(5);
List<List<?>> ingestBooks = new ArrayList<>();
for (Book b : books) {
@@ -144,23 +140,23 @@ public class CassandraOutboundAdapterIntegrationTests {
}
Message<List<List<?>>> message = MessageBuilder.withPayload(ingestBooks).build();
cassandraMessageHandler3.send(message);
this.cassandraMessageHandler3.send(message);
Select select = QueryBuilder.select().all().from("book");
books = cassandraTemplate.select(select, Book.class);
assertEquals(5, books.size());
cassandraTemplate.delete(books);
books = this.cassandraTemplate.select(select, Book.class);
assertThat(books).hasSize(5);
this.cassandraTemplate.batchOps().delete(books);
}
@Test
public void testExpressionTrucante() throws Exception {
public void testExpressionTruncate() {
Message<Book> message = MessageBuilder.withPayload(BookSampler.getBook()).build();
cassandraMessageHandler1.send(message);
this.cassandraMessageHandler1.send(message);
Select select = QueryBuilder.select().all().from("book");
List<Book> books = cassandraTemplate.select(select, Book.class);
assertEquals(1, books.size());
cassandraMessageHandler4.send(MessageBuilder.withPayload("Empty").build());
books = cassandraTemplate.select(select, Book.class);
assertEquals(0, books.size());
List<Book> books = this.cassandraTemplate.select(select, Book.class);
assertThat(books).hasSize(1);
this.cassandraMessageHandler4.send(MessageBuilder.withPayload("Empty").build());
books = this.cassandraTemplate.select(select, Book.class);
assertThat(books).hasSize(0);
}
}

View File

@@ -1,18 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:cassandra="http://www.springframework.org/schema/data/cassandra"
xmlns:int-cassandra="http://www.springframework.org/schema/integration/cassandra"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-cassandra="http://www.springframework.org/schema/integration/cassandra"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/data/cassandra http://www.springframework.org/schema/data/cassandra/spring-cassandra.xsd
http://www.springframework.org/schema/integration/cassandra http://www.springframework.org/schema/integration/cassandra/spring-integration-cassandra.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
http://www.springframework.org/schema/integration/cassandra http://www.springframework.org/schema/integration/cassandra/spring-integration-cassandra.xsd">
<int:poller default="true" fixed-delay="50"/>
<int:poller default="true" fixed-delay="50"/>
<int:channel id="input">
<int:queue/>
</int:channel>
@@ -20,45 +17,43 @@
<int:channel id="resultChannel">
<int:queue/>
</int:channel>
<bean id="cassandraTemplate" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.data.cassandra.core.CassandraOperations" />
</bean>
<bean id="writeOptions" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.cassandra.core.WriteOptions" />
<constructor-arg value="org.springframework.data.cassandra.core.ReactiveCassandraOperations"/>
</bean>
<int-cassandra:outbound-channel-adapter id="outbound1"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
auto-startup="false"
/>
<int-cassandra:outbound-channel-adapter id="outbound2"
channel="input"
cassandra-template="cassandraTemplate"
ingest-query="insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"
/>
<int-cassandra:outbound-gateway id="outgateway"
request-channel="input"
cassandra-template="cassandraTemplate"
mode="STATEMENT"
write-options="writeOptions"
query="SELECT * FROM book limit :size"
reply-channel ="resultChannel"
auto-startup="true">
<int-cassandra:parameter-expression name="author" expression="payload"/>
<int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
<int-cassandra:outbound-channel-adapter id="outbound4"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
statement-expression="T(QueryBuilder).truncate('book')"
auto-startup="false"
/>
<bean id="writeOptions" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.data.cassandra.core.InsertOptions"/>
</bean>
<int-cassandra:outbound-channel-adapter id="outbound1"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
auto-startup="false"
async="false"/>
<int-cassandra:outbound-channel-adapter id="outbound2"
channel="input"
cassandra-template="cassandraTemplate"
ingest-query="insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)"/>
<int-cassandra:outbound-gateway id="outgateway"
request-channel="input"
cassandra-template="cassandraTemplate"
mode="STATEMENT"
write-options="writeOptions"
query="SELECT * FROM book limit :size"
reply-channel="resultChannel"
auto-startup="true">
<int-cassandra:parameter-expression name="author" expression="payload"/>
<int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
<int-cassandra:outbound-channel-adapter id="outbound4"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
statement-expression="T(QueryBuilder).truncate('book')"
auto-startup="false"/>
</beans>

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
package org.springframework.integration.cassandra.config;
import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -25,15 +25,14 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.cassandra.outbound.CassandraMessageHandler;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Filippo Balicchia
* @author Artem Bilan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@RunWith(SpringRunner.class)
public class CassandraOutboundAdapterParserTests {
@Autowired
@@ -41,46 +40,52 @@ public class CassandraOutboundAdapterParserTests {
@Test
public void minimalConfig() {
CassandraMessageHandler handler =
TestUtils.getPropertyValue(this.context.getBean("outbound1.adapter"), "handler",
CassandraMessageHandler.class);
CassandraMessageHandler<?> handler = TestUtils.getPropertyValue(context.getBean("outbound1.adapter"), "handler",
CassandraMessageHandler.class);
assertEquals("outbound1.adapter", TestUtils.getPropertyValue(handler, "componentName"));
assertEquals(CassandraMessageHandler.Type.INSERT, TestUtils.getPropertyValue(handler, "mode"));
assertEquals(context.getBean("cassandraTemplate"), TestUtils.getPropertyValue(handler, "cassandraTemplate"));
assertEquals(context.getBean("writeOptions"), TestUtils.getPropertyValue(handler, "writeOptions"));
assertThat(TestUtils.getPropertyValue(handler, "componentName")).isEqualTo("outbound1.adapter");
assertThat(TestUtils.getPropertyValue(handler, "mode")).isEqualTo(CassandraMessageHandler.Type.INSERT);
assertThat(TestUtils.getPropertyValue(handler, "cassandraOperations"))
.isSameAs(this.context.getBean("cassandraTemplate"));
assertThat(TestUtils.getPropertyValue(handler, "writeOptions")).isSameAs(this.context.getBean("writeOptions"));
assertThat(TestUtils.getPropertyValue(handler, "async", Boolean.class)).isFalse();
}
@Test
public void ingestConfig() {
CassandraMessageHandler<?> handler = TestUtils.getPropertyValue(context.getBean("outbound2"), "handler",
CassandraMessageHandler.class);
CassandraMessageHandler handler =
TestUtils.getPropertyValue(this.context.getBean("outbound2"), "handler",
CassandraMessageHandler.class);
assertEquals("insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)",
TestUtils.getPropertyValue(handler, "ingestQuery"));
assertEquals(Boolean.FALSE, TestUtils.getPropertyValue(handler, "producesReply"));
assertThat(TestUtils.getPropertyValue(handler, "ingestQuery"))
.isEqualTo("insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, " +
"?)");
assertThat(TestUtils.getPropertyValue(handler, "producesReply", Boolean.class)).isFalse();
}
@Test
public void fullConfig() {
CassandraMessageHandler<?> handler = TestUtils.getPropertyValue(context.getBean("outgateway"), "handler",
CassandraMessageHandler.class);
CassandraMessageHandler handler =
TestUtils.getPropertyValue(this.context.getBean("outgateway"), "handler",
CassandraMessageHandler.class);
assertEquals(Boolean.TRUE, TestUtils.getPropertyValue(handler, "producesReply"));
assertEquals(CassandraMessageHandler.Type.STATEMENT, TestUtils.getPropertyValue(handler, "mode"));
assertEquals(context.getBean("writeOptions"), TestUtils.getPropertyValue(handler, "writeOptions"));
assertThat(TestUtils.getPropertyValue(handler, "producesReply", Boolean.class)).isTrue();
assertThat(TestUtils.getPropertyValue(handler, "mode")).isEqualTo(CassandraMessageHandler.Type.STATEMENT);
assertThat(TestUtils.getPropertyValue(handler, "writeOptions")).isSameAs(this.context.getBean("writeOptions"));
}
@Test
public void statementConfig() {
CassandraMessageHandler handler =
TestUtils.getPropertyValue(this.context.getBean("outbound4.adapter"), "handler",
CassandraMessageHandler.class);
CassandraMessageHandler<?> handler = TestUtils.getPropertyValue(context.getBean("outbound4.adapter"), "handler",
CassandraMessageHandler.class);
assertEquals("outbound4.adapter", TestUtils.getPropertyValue(handler, "componentName"));
assertEquals(CassandraMessageHandler.Type.STATEMENT, TestUtils.getPropertyValue(handler, "mode"));
assertEquals(context.getBean("cassandraTemplate"), TestUtils.getPropertyValue(handler, "cassandraTemplate"));
assertEquals(context.getBean("writeOptions"), TestUtils.getPropertyValue(handler, "writeOptions"));
assertThat(TestUtils.getPropertyValue(handler, "componentName")).isEqualTo("outbound4.adapter");
assertThat(TestUtils.getPropertyValue(handler, "mode")).isEqualTo(CassandraMessageHandler.Type.STATEMENT);
assertThat(TestUtils.getPropertyValue(handler, "cassandraOperations"))
.isSameAs(this.context.getBean("cassandraTemplate"));
assertThat(TestUtils.getPropertyValue(handler, "writeOptions")).isSameAs(this.context.getBean("writeOptions"));
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,8 @@
package org.springframework.integration.cassandra.config;
import org.junit.Assert;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import org.springframework.beans.factory.config.BeanDefinition;
@@ -25,8 +26,8 @@ import org.springframework.integration.cassandra.config.xml.CassandraParserUtils
/**
* @author Filippo Balicchia
* @author Artem Bilan
*/
public class CassandraParserUtilsTests {
@Test
@@ -34,15 +35,16 @@ public class CassandraParserUtilsTests {
String query = "";
BeanDefinition statementExpressionDef = null;
String ingestQuery = "";
Assert.assertTrue(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery));
assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isTrue();
}
@Test
public void mutuallyExclusiveCase2() {
String query = "";
BeanDefinition statementExpressionDef = null;
String ingestQuery = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
Assert.assertTrue(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery));
String ingestQuery =
"insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isTrue();
}
@Test
@@ -50,15 +52,16 @@ public class CassandraParserUtilsTests {
String query = "";
BeanDefinition statementExpressionDef = new RootBeanDefinition();
String ingestQuery = "";
Assert.assertTrue(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery));
assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isTrue();
}
@Test
public void mutuallyExclusiveCase4() {
String query = "";
BeanDefinition statementExpressionDef = new RootBeanDefinition();
String ingestQuery = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery));
String ingestQuery =
"insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse();
}
@Test
@@ -66,15 +69,16 @@ public class CassandraParserUtilsTests {
String query = "SELECT * FROM book limit :size";
BeanDefinition statementExpressionDef = new RootBeanDefinition();
String ingestQuery = "";
Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery));
assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse();
}
@Test
public void mutuallyExclusiveCase6() {
String query = "SELECT * FROM book limit :size";
BeanDefinition statementExpressionDef = new RootBeanDefinition();
String ingestQuery = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery));
String ingestQuery =
"insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse();
}
@Test
@@ -82,15 +86,16 @@ public class CassandraParserUtilsTests {
String query = "SELECT * FROM book limit :size";
BeanDefinition statementExpressionDef = new RootBeanDefinition();
String ingestQuery = "";
Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery));
assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse();
}
@Test
public void mutuallyExclusiveCase8() {
String query = "SELECT * FROM book limit :size";
BeanDefinition statementExpressionDef = new RootBeanDefinition();
String ingestQuery = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
Assert.assertFalse(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery));
String ingestQuery =
"insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
assertThat(CassandraParserUtils.areMutuallyExclusive(query, statementExpressionDef, ingestQuery)).isFalse();
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2015 the original author or authors
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,30 +16,30 @@
package org.springframework.integration.cassandra.config;
import static org.springframework.cassandra.core.keyspace.CreateKeyspaceSpecification.createKeyspace;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.springframework.cassandra.core.keyspace.CreateKeyspaceSpecification;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.cassandra.config.AbstractReactiveCassandraConfiguration;
import org.springframework.data.cassandra.config.SchemaAction;
import org.springframework.data.cassandra.config.java.AbstractCassandraConfiguration;
import org.springframework.data.cassandra.core.cql.keyspace.CreateKeyspaceSpecification;
/**
* Setup any spring configuration for unit tests
*
* @author David Webb
* @author Matthew T. Adams
* @author Artem Bilan
*/
@Configuration
public class IntegrationTestConfig extends AbstractCassandraConfiguration {
public class IntegrationTestConfig extends AbstractReactiveCassandraConfiguration {
public static final String HOST = "localhost";
//public static final SpringCassandraBuildProperties PROPS = new SpringCassandraBuildProperties();
public static final int PORT = 9043;//PROPS.getCassandraPort();
// public static final SpringCassandraBuildProperties PROPS = new SpringCassandraBuildProperties();
public static final int PORT = 9043; //PROPS.getCassandraPort();
// public static final int RPC_PORT = PROPS.getCassandraRpcPort();
@@ -61,12 +61,14 @@ public class IntegrationTestConfig extends AbstractCassandraConfiguration {
@Override
protected String getKeyspaceName() {
return keyspaceName;
return this.keyspaceName;
}
@Override
protected List<CreateKeyspaceSpecification> getKeyspaceCreations() {
return Arrays.asList(createKeyspace().name(getKeyspaceName()).withSimpleReplication());
return Collections.singletonList(
CreateKeyspaceSpecification.createKeyspace(getKeyspaceName())
.withSimpleReplication());
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,39 +16,35 @@
package org.springframework.integration.cassandra.outbound;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.thrift.transport.TTransportException;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cassandra.core.ConsistencyLevel;
import org.springframework.cassandra.core.RetryPolicy;
import org.springframework.cassandra.core.WriteOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.cassandra.core.CassandraOperations;
import org.springframework.data.cassandra.core.InsertOptions;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import org.springframework.data.cassandra.core.WriteResult;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.cassandra.config.IntegrationTestConfig;
import org.springframework.integration.cassandra.test.domain.Book;
import org.springframework.integration.cassandra.test.domain.BookSampler;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
@@ -56,100 +52,25 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
* @author Soby Chacko
* @author Artem Bilan
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@RunWith(SpringRunner.class)
@DirtiesContext
public class CassandraMessageHandlerTests {
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
@Configuration
@EnableIntegration
public static class Config extends IntegrationTestConfig {
@Autowired
public CassandraOperations template;
@Override
public String[] getEntityBasePackages() {
return new String[] { Book.class.getPackage().getName() };
}
@Bean
public MessageHandler cassandraMessageHandler1() {
CassandraMessageHandler<Book> cassandraMessageHandler = new CassandraMessageHandler<>(this.template);
cassandraMessageHandler.setProducesReply(false);
return cassandraMessageHandler;
}
@Bean
public PollableChannel messageChannel() {
return new NullChannel();
}
@Bean
public MessageHandler cassandraMessageHandler2() {
CassandraMessageHandler<Book> cassandraMessageHandler = new CassandraMessageHandler<>(this.template);
WriteOptions options = new WriteOptions();
options.setTtl(60);
options.setConsistencyLevel(ConsistencyLevel.ONE);
options.setRetryPolicy(RetryPolicy.DOWNGRADING_CONSISTENCY);
cassandraMessageHandler.setWriteOptions(options);
cassandraMessageHandler.setOutputChannel(messageChannel());
return cassandraMessageHandler;
}
@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler<Book> cassandraMessageHandler = new CassandraMessageHandler<>(this.template);
String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
return cassandraMessageHandler;
}
@Bean
public PollableChannel resultChannel() {
return new QueueChannel();
}
@Bean
public MessageHandler cassandraMessageHandler4() {
CassandraMessageHandler<Book> cassandraMessageHandler = new CassandraMessageHandler<>(this.template);
// TODO https://jira.spring.io/browse/DATACASS-213
// cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author
// = :author limit :size");
cassandraMessageHandler.setQuery("SELECT * FROM book limit :size");
Map<String, Expression> params = new HashMap<>();
params.put("author", PARSER.parseExpression("payload"));
params.put("size", PARSER.parseExpression("headers.limit"));
cassandraMessageHandler.setParameterExpressions(params);
cassandraMessageHandler.setOutputChannel(resultChannel());
cassandraMessageHandler.setProducesReply(true);
return cassandraMessageHandler;
}
}
private static final String CASSANDRA_CONFIG = "spring-cassandra.yaml";
@Autowired
public MessageHandler cassandraMessageHandler1;
@@ -167,37 +88,21 @@ public class CassandraMessageHandlerTests {
public CassandraOperations template;
@Autowired
public PollableChannel resultChannel;
protected static final String CASSANDRA_CONFIG = "spring-cassandra.yaml";
/**
* The {@link Cluster} that's connected to Cassandra.
*/
protected static Cluster cluster;
/**
* The session connected to the system keyspace.
*/
protected static Session system;
public FluxMessageChannel resultChannel;
@BeforeClass
public static void startCassandra()
throws TTransportException, IOException, InterruptedException, ConfigurationException {
public static void startCassandra() throws Exception {
EmbeddedCassandraServerHelper.startEmbeddedCassandra(CASSANDRA_CONFIG, "build/embeddedCassandra");
cluster = Cluster.builder().addContactPoint(IntegrationTestConfig.HOST).withPort(IntegrationTestConfig.PORT)
.build();
system = cluster.connect();
EmbeddedCassandraServerHelper.getSession();
}
@AfterClass
public static void cleanup() {
cluster.close();
EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
}
@Test
public void testBasicCassandraInsert() throws Exception {
public void testBasicCassandraInsert() {
Book b1 = new Book();
b1.setIsbn("123456-1");
b1.setTitle("Spring Integration Cassandra");
@@ -211,13 +116,13 @@ public class CassandraMessageHandlerTests {
Select select = QueryBuilder.select().all().from("book");
List<Book> books = this.template.select(select, Book.class);
assertEquals(1, books.size());
assertThat(books).hasSize(1);
this.template.delete(b1);
}
@Test
public void testCassandraBatchInsertAndSelectStatement() throws Exception {
public void testCassandraBatchInsertAndSelectStatement() {
List<Book> books = BookSampler.getBookList(5);
this.cassandraMessageHandler2.handleMessage(new GenericMessage<>(books));
@@ -225,23 +130,25 @@ public class CassandraMessageHandlerTests {
Message<?> message = MessageBuilder.withPayload("Cassandra Guru").setHeader("limit", 2).build();
this.cassandraMessageHandler4.handleMessage(message);
Message<?> receive = this.resultChannel.receive(10000);
assertNotNull(receive);
assertThat(receive.getPayload(), instanceOf(ResultSet.class));
ResultSet resultSet = (ResultSet) receive.getPayload();
assertNotNull(resultSet);
List<Row> rows = resultSet.all();
assertEquals(2, rows.size());
Mono<Integer> testMono =
Mono.from(this.resultChannel)
.map(Message::getPayload)
.cast(WriteResult.class)
.map(r -> r.getRows().size());
StepVerifier.create(testMono)
.expectNext(1)
.expectComplete()
.verify();
this.cassandraMessageHandler1.handleMessage(new GenericMessage<>(QueryBuilder.truncate("book")));
}
@Test
public void testCassandraBatchIngest() throws Exception {
public void testCassandraBatchIngest() {
List<Book> books = BookSampler.getBookList(5);
List<List<?>> ingestBooks = new ArrayList<>();
for (Book b : books) {
List<Object> l = new ArrayList<>();
l.add(b.getIsbn());
l.add(b.getTitle());
@@ -257,8 +164,82 @@ public class CassandraMessageHandlerTests {
Select select = QueryBuilder.select().all().from("book");
books = this.template.select(select, Book.class);
assertEquals(5, books.size());
assertThat(books).hasSize(5);
this.template.delete(books);
this.template.batchOps().delete(books);
}
@Configuration
@EnableIntegration
public static class Config extends IntegrationTestConfig {
@Autowired
public ReactiveCassandraOperations template;
@Override
public String[] getEntityBasePackages() {
return new String[] { Book.class.getPackage().getName() };
}
@Bean
public MessageHandler cassandraMessageHandler1() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}
@Bean
public PollableChannel messageChannel() {
return new NullChannel();
}
@Bean
public MessageHandler cassandraMessageHandler2() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
WriteOptions options =
InsertOptions.builder()
.ttl(60)
.consistencyLevel(ConsistencyLevel.ONE)
.build();
cassandraMessageHandler.setWriteOptions(options);
cassandraMessageHandler.setOutputChannel(messageChannel());
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}
@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
String cqlIngest =
"insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}
@Bean
public FluxMessageChannel resultChannel() {
return new FluxMessageChannel();
}
@Bean
public MessageHandler cassandraMessageHandler4() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");
Map<String, Expression> params = new HashMap<>();
params.put("author", PARSER.parseExpression("payload"));
params.put("size", PARSER.parseExpression("headers.limit"));
cassandraMessageHandler.setParameterExpressions(params);
cassandraMessageHandler.setOutputChannel(resultChannel());
cassandraMessageHandler.setProducesReply(true);
return cassandraMessageHandler;
}
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2015 the original author or authors
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +17,18 @@
package org.springframework.integration.cassandra.test.domain;
import java.util.Date;
import java.util.Objects;
import org.springframework.data.cassandra.core.mapping.Indexed;
import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;
import org.springframework.data.cassandra.mapping.Indexed;
import org.springframework.data.cassandra.mapping.PrimaryKey;
import org.springframework.data.cassandra.mapping.Table;
/**
* Test POJO
*
* @author David Webb
* @author Artem Bilan
*/
@Table("book")
public class Book {
@@ -47,14 +51,14 @@ public class Book {
* @return Returns the isbn.
*/
public String getIsbn() {
return isbn;
return this.isbn;
}
/**
* @return Returns the saleDate.
*/
public Date getSaleDate() {
return saleDate;
return this.saleDate;
}
/**
@@ -68,7 +72,7 @@ public class Book {
* @return Returns the isInStock.
*/
public boolean isInStock() {
return isInStock;
return this.isInStock;
}
/**
@@ -89,7 +93,7 @@ public class Book {
* @return Returns the title.
*/
public String getTitle() {
return title;
return this.title;
}
/**
@@ -103,7 +107,7 @@ public class Book {
* @return Returns the author.
*/
public String getAuthor() {
return author;
return this.author;
}
/**
@@ -117,7 +121,7 @@ public class Book {
* @return Returns the pages.
*/
public int getPages() {
return pages;
return this.pages;
}
/**
@@ -132,8 +136,30 @@ public class Book {
*/
@Override
public String toString() {
return ("isbn -> " + isbn) + "\n" + "tile -> " + title + "\n" + "author -> " + author
+ "\n" + "pages -> " + pages + "\n";
return ("isbn -> " + this.isbn) + "\n" + "tile -> " + this.title + "\n" + "author -> " + this.author
+ "\n" + "pages -> " + this.pages + "\n";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Book book = (Book) o;
return this.pages == book.pages &&
this.isInStock == book.isInStock &&
Objects.equals(this.isbn, book.isbn) &&
Objects.equals(this.title, book.title) &&
Objects.equals(this.author, book.author) &&
Objects.equals(this.saleDate, book.saleDate);
}
@Override
public int hashCode() {
return Objects.hash(this.isbn, this.title, this.author, this.pages, this.saleDate, this.isInStock);
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2016 the original author or authors
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,16 +23,14 @@ import java.util.UUID;
/**
* @author Filippo Balicchia
* @author Artem Bilan
*/
public class BookSampler {
public final class BookSampler {
public static List<Book> getBookList(int numBooks) {
List<Book> books = new ArrayList<>();
Book b;
for (int i = 0; i < numBooks; i++) {
b = new Book();
for (int i = 0; i < numBooks - 1; i++) {
Book b = new Book();
b.setIsbn(UUID.randomUUID().toString());
b.setTitle("Spring Data Cassandra Guide");
b.setAuthor("Cassandra Guru puppy");
@@ -41,6 +39,7 @@ public class BookSampler {
b.setSaleDate(new Date());
books.add(b);
}
books.add(getBook());
return books;
}
@@ -55,4 +54,7 @@ public class BookSampler {
return b1;
}
private BookSampler() {
}
}

View File

@@ -1,8 +0,0 @@
log4j.rootCategory=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss.SSS} %-5p [%t][%c] %m%n
log4j.category.org.springframework.integration=WARN
log4j.category.org.springframework.integration.cassandra=INFO

View File

@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d %5p | %t | %-55logger{55} | %m | %n</pattern>
</encoder>
</appender>
<logger name="org.springframework" level="ERROR"/>
<logger name="org.springframework.integration" level="ERROR"/>
<logger name="org.springframework.integration.cassandra" level="ERROR"/>
<logger name="com.datastax" level="ERROR"/>
<!-- See https://issues.apache.org/jira/browse/CASSANDRA-8220 -->
<logger name="org.apache.cassandra.service.CassandraDaemon" level="OFF"/>
<!-- Suppress "Cannot connect to any host" messages caused by open sessions and
and an already shut down embedded Cassandra instance because of concurrent shutdown hooks -->
<logger name="com.datastax.driver.core.ControlConnection" level="OFF"/>
<logger name="com.datastax.driver.core.Session" level="OFF"/>
<!-- This one is noisy and subject to be refactored -->
<logger name="org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntityMetadataVerifier"
level="OFF"/>
<root level="ERROR">
<appender-ref ref="console"/>
</root>
</configuration>

View File

@@ -1,6 +1,5 @@
# Cassandra storage config YAML
# NOTE:
# See http://wiki.apache.org/cassandra/StorageConfiguration for
# full explanations of configuration directives
# /NOTE
@@ -37,7 +36,8 @@ cluster_name: 'Test Cluster'
initial_token:
# See http://wiki.apache.org/cassandra/HintedHandoff
hinted_handoff_enabled: true
hinted_handoff_enabled: false
# this defines the maximum amount of time a dead host will have hints
# generated. After it has been dead this long, new hints for it will not be
# created until it has been seen alive and gone down again.
@@ -102,11 +102,37 @@ authorizer: org.apache.cassandra.auth.AllowAllAuthorizer
# partitioners and token selection.
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# Directories where Cassandra should store data on disk. Cassandra
# will spread data evenly across them, subject to the granularity of
# the configured compaction strategy.
# If not set, the default directory is $CASSANDRA_HOME/data/data.
# data_file_directories:
# - /var/lib/cassandra/data
# commit log. when running on magnetic HDD, this should be a
# separate spindle than the data directories.
# If not set, the default directory is $CASSANDRA_HOME/data/commitlog.
# commitlog_directory: /var/lib/cassandra/commitlog
# Enable / disable CDC functionality on a per-node basis. This modifies the logic used
# for write path allocation rejection (standard: never reject. cdc: reject Mutation
# containing a CDC-enabled table if at space limit in cdc_raw_directory).
cdc_enabled: false
# CommitLogSegments are moved to this directory on flush if cdc_enabled: true and the
# segment contains mutations for a CDC-enabled table. This should be placed on a
# separate spindle than the data directories. If not set, the default directory is
# $CASSANDRA_HOME/data/cdc_raw.
cdc_raw_directory: build/embeddedCassandra/data/cdc_raw
# Directories where Cassandra should store data on disk. Cassandra
# will spread data evenly across them, subject to the granularity of
# the configured compaction strategy.
data_file_directories:
- build/embeddedCassandra/data
- build/embeddedCassandra/data
hints_directory: build/embeddedCassandra/hints
# commit log
commitlog_directory: build/embeddedCassandra/commitlog
@@ -186,7 +212,7 @@ saved_caches_directory: build/embeddedCassandra/saved_caches
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
# milliseconds.
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
commitlog_sync_period_in_ms: 5000
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
@@ -197,20 +223,20 @@ commitlog_sync_period_in_ms: 10000
# archiving commitlog segments (see commitlog_archiving.properties),
# then you probably want a finer granularity of archiving; 8 or 16 MB
# is reasonable.
commitlog_segment_size_in_mb: 32
commitlog_segment_size_in_mb: 8
# any class that implements the SeedProvider interface and has a
# constructor that takes a Map<String, String> of parameters will do.
seed_provider:
# Addresses of hosts that are deemed contact points.
# Cassandra nodes use this list of hosts to find each other and learn
# the topology of the ring. You must change this if you are running
# multiple nodes!
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
# seeds is actually a comma-delimited list of addresses.
# Ex: "<ip1>,<ip2>,<ip3>"
- seeds: "127.0.0.1"
# Addresses of hosts that are deemed contact points.
# Cassandra nodes use this list of hosts to find each other and learn
# the topology of the ring. You must change this if you are running
# multiple nodes!
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
# seeds is actually a comma-delimited list of addresses.
# Ex: "<ip1>,<ip2>,<ip3>"
- seeds: "127.0.0.1"
# For workloads with more data than can fit in memory, Cassandra's
# bottleneck will be reads that need to fetch data from
@@ -221,8 +247,8 @@ seed_provider:
# On the other hand, since writes are almost never IO bound, the ideal
# number of "concurrent_writes" is dependent on the number of cores in
# your system; (8 * number_of_cores) is a good rule of thumb.
concurrent_reads: 32
concurrent_writes: 32
concurrent_reads: 4
concurrent_writes: 4
# Total memory to use for memtables. Cassandra will flush the largest
# memtable when this much memory is used.
@@ -237,7 +263,7 @@ concurrent_writes: 32
# segment multiple), Cassandra will flush every dirty CF in the oldest
# segment and remove it. So a small total commitlog space will tend
# to cause more flush activity on less-active columnfamilies.
# commitlog_total_space_in_mb: 4096
commitlog_total_space_in_mb: 4096
# This sets the amount of memtable flush writer threads. These will
# be blocked by disk io, and each one will hold a memtable in memory
@@ -295,10 +321,10 @@ native_transport_port: 9043
# transport is used. They are similar to rpc_min_threads and rpc_max_threads,
# though the defaults differ slightly.
# native_transport_min_threads: 16
# native_transport_max_threads: 128
#native_transport_max_threads: 48
# Whether to start the thrift rpc server.
start_rpc: true
start_rpc: false
# The address to bind the Thrift RPC service to -- clients connect
# here. Unlike ListenAddress above, you _can_ specify 0.0.0.0 here if
@@ -385,7 +411,7 @@ snapshot_before_compaction: false
# or dropping of column families. The STRONGLY advised default of true
# should be used to provide data safety. If you set this flag to false, you will
# lose data on truncation or drop.
auto_snapshot: true
auto_snapshot: false
# Add column indexes to a row after its contents reach this size.
# Increase if your column values are large, or if you have a very large
@@ -425,17 +451,17 @@ compaction_throughput_mb_per_sec: 16
# stream_throughput_outbound_megabits_per_sec: 200
# How long the coordinator should wait for read operations to complete
read_request_timeout_in_ms: 10000
read_request_timeout_in_ms: 120000
# How long the coordinator should wait for seq or index scans to complete
range_request_timeout_in_ms: 10000
range_request_timeout_in_ms: 120000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 10000
write_request_timeout_in_ms: 120000
# How long the coordinator should wait for truncates to complete
# (This can be much longer, because unless auto_snapshot is disabled
# we need to flush first so we can snapshot before removing the data.)
truncate_request_timeout_in_ms: 60000
truncate_request_timeout_in_ms: 120000
# The default timeout for other, miscellaneous operations
request_timeout_in_ms: 10000
request_timeout_in_ms: 120000
# Enable operation timeout information exchange between nodes to accurately
# measure request timeouts, If disabled cassandra will assuming the request
@@ -572,7 +598,7 @@ request_scheduler: org.apache.cassandra.scheduler.NoScheduler
# offs. This value is not often changed, however if you have many
# very small rows (many to an OS page), then increasing this will
# often lower memory usage without a impact on performance.
index_interval: 128
# index_interval: 128
# Enable or disable inter-node encryption
# Default settings are TLS v1, RSA 1024-bit keys (it is imperative that
@@ -621,7 +647,7 @@ client_encryption_options:
# can be: all - all traffic is compressed
# dc - traffic between different datacenters is compressed
# none - nothing is compressed.
internode_compression: all
internode_compression: none
# Enable or disable tcp_nodelay for inter-dc communication.
# Disabling it will result in larger (but fewer) network packets being sent,