diff --git a/spring-integration-groovy-dsl/.gitignore b/spring-integration-groovy-dsl/.gitignore new file mode 100644 index 0000000..b412f22 --- /dev/null +++ b/spring-integration-groovy-dsl/.gitignore @@ -0,0 +1,12 @@ +target +.settings +.springBeans +.classpath +.project +/.idea +/.gradle +/build +/*.iml +/*.ipr +/*.iws +/bin/ diff --git a/spring-integration-groovy-dsl/README.adoc b/spring-integration-groovy-dsl/README.adoc new file mode 100644 index 0000000..2aa8130 --- /dev/null +++ b/spring-integration-groovy-dsl/README.adoc @@ -0,0 +1,110 @@ +//## **_This project has been absorbed by Spring Integration Core starting with version 6.0. +//Please consult its https://docs.spring.io/spring-integration/docs/current/reference/html[Reference Manual] for the actual documentation. +//This project is only in a maintenance, bug fixing state._** + += Spring Integration Groovy DSL + +This project is a Groovy extension for https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl[Spring Integration Java DSL]. + +NOTE: The project should be treated as experimental and API is a subject to changes. + +NOTE: The current DSL solution is very similar to existing https://docs.spring.io/spring-integration/docs/current/reference/html/kotlin-dsl.html#kotlin-dsl[Spring Integration Kotlin DSL]. + +The main goal we pursue here is to make Spring Integration development on Groovy as smooth and straightforward as is it possible with interoperability with existing Java DSL and some Groovy extensions or language-specific structures. + +All you need to get started is just an import for `import static org.springframework.integration.dsl.IntegrationGroovyDsl.integrationFlow` - an overloaded factory methods for Groovy DSL. + +For `IntegrationFlow` definitions as lambdas we typically don't need anything else from Groovy and just declare a bean like this: + +==== +[source, groovy] +---- +@Bean +IntegrationFlow oddFlow() { + { IntegrationFlowDefinition flow -> + flow.handle(Object, { p, h -> 'odd' }) + } +} +---- +==== + +In this case Groovy understands that the closure should be translated into an `IntegrationFlow` anonymous instance and target Java DSL processor parses this construction properly into Java objects. + +As an alternative to the construction above and for consistency with use-cases explained below, this project suggest a Groovy-specific DSL for declaring integration flows in the *builder* pattern style: + +==== +[source, groovy] +---- +@Bean +flowLambda() { + integrationFlow { + filter String, { it == 'test' }, { id 'filterEndpoint' } + wireTap integrationFlow { + channel { queue 'wireTapChannel' } + } + delay 'delayGroup', { defaultDelay 100 } + transform String, { it.toUpperCase() } + } +} +---- +==== + +Such a global `integrationFlow()` function expects a closure in builder style for a `GroovyIntegrationFlowDefinition` (a Groovy wrapper for the `IntegrationFlowDefinition`) and produces a regular `IntegrationFlow` lambda implementation. +See more overloaded `integrationFlow()` variants below. + +Many other scenarios require an `IntegrationFlow` to be started from source of data (e.g. `JdbcPollingChannelAdapter`, `JmsInboundGateway` or just an existing `MessageChannel`). +For this purpose Spring Integration Java DSL provides an `IntegrationFlows` factory with its bunch of overloaded `from()` methods. +This factory can be used in groovy as well: + +==== +[source, groovy] +---- +@Bean +flowFromSupplier() { + IntegrationFlows.fromSupplier({ 'bar' }) { e -> e.poller { p -> p.fixedDelay(10).maxMessagesPerPoll(1) } } + .channel({ c -> c.queue('fromSupplierQueue') } as Function) + .get() +} +---- +==== + +But unfortunately not all `from()` methods are compatible with Groovy structures. +To fix a gap, this project provides a Groovy DSL around an `IntegrationFlows` factory. +It is done as a set of overloaded `integrationFlow()` functions. +With a consumer for a `GroovyIntegrationFlowDefinition` to declare the rest of the flow as an `IntegrationFlow` closure to reuse the mentioned above experience and also avoid `get()` call in the end. +For example: + +==== +[source, groovy] +---- +@Bean +functionFlow() { + integrationFlow Function, + { beanName 'functionGateway' }, + { + transform Transformers.objectToString(), { id 'objectToStringTransformer' } + transform String, { it.toUpperCase() } + split Message, { it.payload } + split Object, { it }, { id 'splitterEndpoint' } + resequence() + aggregate { + id 'aggregator' + outputProcessor { it.one } + } + } +} + +@Bean +someFlow() { + integrationFlow ({ 'test' }, + { + poller { it.trigger new OnlyOnceTrigger() } + id 'pollingSource' + }) + { + log LoggingHandler.Level.WARN, 'test.category' + channel { queue 'pollerResultChannel' } + } +} +---- +==== diff --git a/spring-integration-groovy-dsl/build.gradle b/spring-integration-groovy-dsl/build.gradle new file mode 100644 index 0000000..3abb029 --- /dev/null +++ b/spring-integration-groovy-dsl/build.gradle @@ -0,0 +1,173 @@ +plugins { + id 'groovy' + id 'java-library' + id 'eclipse' + id 'idea' + id 'jacoco' + id 'org.sonarqube' version '2.8' + id 'io.spring.dependency-management' version '1.0.11.RELEASE' + id 'com.jfrog.artifactory' version '4.24.20' +} + +description = 'Spring Integration Groovy DSL' + +group = 'org.springframework.integration' + +repositories { + mavenCentral() + maven { url 'https://repo.spring.io/release' } + maven { url 'https://repo.spring.io/milestone' } + if (version.endsWith('SNAPSHOT')) { + maven { url 'https://repo.spring.io/snapshot' } + } +// maven { url 'https://repo.spring.io/libs-staging-local' } +} + +ext { + groovyVersion = '3.0.9' + junitVersion = '5.7.2' + log4jVersion = '2.17.0' + reactorVersion = '2020.0.13' + spockVersion = '2.0-groovy-3.0' + springIntegrationVersion = '5.5.7' + + idPrefix = 'groovy-dsl' + + linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions' + linkCi = 'https://build.spring.io/browse/INTEXT' + linkIssue = 'https://github.com/spring-projects/spring-integration-extensions/issues' + linkScmUrl = 'https://github.com/spring-projects/spring-integration-extensions' + linkScmConnection = 'https://github.com/spring-projects/spring-integration-extensions.git' + linkScmDevConnection = 'git@github.com:spring-projects/spring-integration-extensions.git' + +} + +dependencyManagement { + resolutionStrategy { + cacheChangingModulesFor 0, 'seconds' + } + applyMavenExclusions = false + generatedPomCustomization { + enabled = false + } + + imports { + mavenBom "org.apache.logging.log4j:log4j-bom:$log4jVersion" + mavenBom "org.codehaus.groovy:groovy-bom:$groovyVersion" + mavenBom "org.junit:junit-bom:$junitVersion" + mavenBom "io.projectreactor:reactor-bom:$reactorVersion" + mavenBom "org.springframework.integration:spring-integration-bom:$springIntegrationVersion" + } + +} + +eclipse.project.natures += 'org.springframework.ide.eclipse.core.springnature' + +jacoco.toolVersion = '0.8.7' + +dependencies { + api 'org.codehaus.groovy:groovy' + api 'org.springframework.integration:spring-integration-core' + + testImplementation 'org.springframework.integration:spring-integration-test' + testImplementation 'org.junit.jupiter:junit-jupiter-api' + testImplementation 'io.projectreactor:reactor-test' + testImplementation "org.spockframework:spock-spring:$spockVersion" + + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testRuntimeOnly 'org.apache.logging.log4j:log4j-jcl' + testRuntimeOnly 'org.apache.logging.log4j:log4j-core' +} + +jacocoTestReport { + reports { + xml.enabled true + csv.enabled false + html.enabled false + xml.destination file("${buildDir}/reports/jacoco/test/jacocoTestReport.xml") + } +} + +test { + // suppress all console output during testing unless running `gradle -i` + logging.captureStandardOutput(LogLevel.INFO) + useJUnitPlatform() + jacoco.destinationFile = file("$buildDir/jacoco.exec") + + if (System.properties['sonar.host.url']) { + finalizedBy jacocoTestReport + } +} + +groovydoc { + link 'https://docs.oracle.com/en/java/javase/17/docs/api', 'java.' + link 'https://docs.groovy-lang.org/latest/html/gapi/', 'groovy.', 'org.codehaus.groovy.' + link 'https://docs.spring.io/spring-integration/docs/current/api/', 'org.springframework.integration.' + link 'https://docs.spring.io/spring-framework/docs/current/javadoc-api/', 'org.springframework.messaging.' + link 'https://projectreactor.io/docs/core/release/api/', 'reactor.' + link 'https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/', 'org.reactivestreams.' +} + +task javadocJar(type: Jar, dependsOn: groovydoc) { + archiveClassifier = 'groovydoc' + from groovydoc +} + +check.dependsOn groovydoc +build.dependsOn jacocoTestReport + +java { + withSourcesJar() + withJavadocJar() +} + +sonarqube { + properties { + property 'sonar.links.homepage', linkHomepage + property 'sonar.links.ci', linkCi + property 'sonar.links.issue', linkIssue + property 'sonar.links.scm', linkScmUrl + property 'sonar.links.scm_dev', linkScmDevConnection + } +} + +task docsZip(type: Zip) { + group = 'Distribution' + archiveClassifier = 'docs' + from(groovydoc) { + into 'api' + } +} + +task distZip(type: Zip, dependsOn: docsZip) { + group = 'Distribution' + archiveClassifier = 'dist' + description = "Builds -${archiveClassifier} archive, containing all jars and docs, " + + "suitable for community download page." + + ext.baseDir = "${project.name}-${project.version}"; + + from('src/dist') { + include 'readme.txt' + include 'license.txt' + into "${baseDir}" + } + + into("${baseDir}/libs") { + from project.jar + from project.sourcesJar + from project.javadocJar + } + + from(zipTree(docsZip.archiveFile)) { + into "${baseDir}/docs" + } +} + +task dist(dependsOn: assemble) { + group = 'Distribution' + description = 'Builds -dist and -docs distribution archives.' +} + +apply from: "${rootProject.projectDir}/publish-maven.gradle" \ No newline at end of file diff --git a/spring-integration-groovy-dsl/gradle.properties b/spring-integration-groovy-dsl/gradle.properties new file mode 100644 index 0000000..3bc2994 --- /dev/null +++ b/spring-integration-groovy-dsl/gradle.properties @@ -0,0 +1,2 @@ +version=0.0.1-SNAPSHOT +org.gradle.caching=true diff --git a/spring-integration-groovy-dsl/gradle/wrapper/gradle-wrapper.jar b/spring-integration-groovy-dsl/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..7454180 Binary files /dev/null and b/spring-integration-groovy-dsl/gradle/wrapper/gradle-wrapper.jar differ diff --git a/spring-integration-groovy-dsl/gradle/wrapper/gradle-wrapper.properties b/spring-integration-groovy-dsl/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..3e000b9 --- /dev/null +++ b/spring-integration-groovy-dsl/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionSha256Sum=b586e04868a22fd817c8971330fec37e298f3242eb85c374181b12d637f80302 \ No newline at end of file diff --git a/spring-integration-groovy-dsl/gradlew b/spring-integration-groovy-dsl/gradlew new file mode 100755 index 0000000..c53aefa --- /dev/null +++ b/spring-integration-groovy-dsl/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/spring-integration-groovy-dsl/gradlew.bat b/spring-integration-groovy-dsl/gradlew.bat new file mode 100644 index 0000000..107acd3 --- /dev/null +++ b/spring-integration-groovy-dsl/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/spring-integration-groovy-dsl/publish-maven.gradle b/spring-integration-groovy-dsl/publish-maven.gradle new file mode 100644 index 0000000..a83b650 --- /dev/null +++ b/spring-integration-groovy-dsl/publish-maven.gradle @@ -0,0 +1,63 @@ +apply plugin: 'maven-publish' + +tasks.withType(GenerateModuleMetadata) { + enabled = false +} + +publishing { + publications { + mavenJava(MavenPublication) { + suppressAllPomMetadataWarnings() + from components.java + artifact docsZip + artifact distZip + pom { + afterEvaluate { + name = project.description + description = project.description + } + url = linkScmUrl + organization { + name = 'Spring IO' + url = 'https://spring.io/projects/spring-integration' + } + licenses { + license { + name = 'Apache License, Version 2.0' + url = 'https://www.apache.org/licenses/LICENSE-2.0.txt' + distribution = 'repo' + } + } + scm { + url = linkScmUrl + connection = linkScmConnection + developerConnection = linkScmDevConnection + } + developers { + developer { + id = 'artembilan' + name = 'Artem Bilan' + email = 'abilan@vmware.com' + roles = ['project lead'] + } + } + issueManagement { + system = 'GitHub' + url = linkIssue + } + } + versionMapping { + usage('java-api') { + fromResolutionResult() + } + usage('java-runtime') { + fromResolutionResult() + } + } + } + } +} + +artifactoryPublish { + publications(publishing.publications.mavenJava) +} diff --git a/spring-integration-groovy-dsl/settings.gradle b/spring-integration-groovy-dsl/settings.gradle new file mode 100644 index 0000000..de2d0ea --- /dev/null +++ b/spring-integration-groovy-dsl/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'spring-integration-groovy-dsl' diff --git a/spring-integration-groovy-dsl/src/dist/license.txt b/spring-integration-groovy-dsl/src/dist/license.txt new file mode 100644 index 0000000..7c50aab --- /dev/null +++ b/spring-integration-groovy-dsl/src/dist/license.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + https://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by testData1) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class testData1 and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [testData1 of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/spring-integration-groovy-dsl/src/main/groovy/org/springframework/integration/dsl/GroovyIntegrationFlowDefinition.groovy b/spring-integration-groovy-dsl/src/main/groovy/org/springframework/integration/dsl/GroovyIntegrationFlowDefinition.groovy new file mode 100644 index 0000000..5c900f1 --- /dev/null +++ b/spring-integration-groovy-dsl/src/main/groovy/org/springframework/integration/dsl/GroovyIntegrationFlowDefinition.groovy @@ -0,0 +1,1221 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl + +import groovy.transform.CompileStatic +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType +import org.reactivestreams.Publisher +import org.springframework.integration.channel.BroadcastCapableChannel +import org.springframework.integration.channel.interceptor.WireTap +import org.springframework.integration.core.GenericSelector +import org.springframework.integration.filter.MethodInvokingSelector +import org.springframework.integration.handler.BridgeHandler +import org.springframework.integration.handler.GenericHandler +import org.springframework.integration.handler.LoggingHandler +import org.springframework.integration.handler.MessageProcessor +import org.springframework.integration.handler.MessageTriggerAction +import org.springframework.integration.handler.ServiceActivatingHandler +import org.springframework.integration.router.AbstractMessageRouter +import org.springframework.integration.router.ErrorMessageExceptionTypeRouter +import org.springframework.integration.router.ExpressionEvaluatingRouter +import org.springframework.integration.router.MethodInvokingRouter +import org.springframework.integration.scattergather.ScatterGatherHandler +import org.springframework.integration.splitter.AbstractMessageSplitter +import org.springframework.integration.splitter.DefaultMessageSplitter +import org.springframework.integration.splitter.ExpressionEvaluatingSplitter +import org.springframework.integration.splitter.MethodInvokingSplitter +import org.springframework.integration.store.MessageStore +import org.springframework.integration.transformer.ExpressionEvaluatingTransformer +import org.springframework.integration.transformer.GenericTransformer +import org.springframework.integration.transformer.HeaderFilter +import org.springframework.integration.transformer.MessageTransformingHandler +import org.springframework.integration.transformer.MethodInvokingTransformer +import org.springframework.messaging.Message +import org.springframework.messaging.MessageChannel +import org.springframework.messaging.MessageHandler +import org.springframework.messaging.support.ChannelInterceptor +import org.springframework.util.StringUtils +import reactor.core.publisher.Flux + +import java.util.concurrent.Executor +import java.util.function.Consumer +import java.util.function.Function + +/** + * The Groovy-specific {@link IntegrationFlowDefinition} wrapper. + * + * @author Artem Bilan + * + * @see IntegrationFlowDefinition + */ +@CompileStatic +class GroovyIntegrationFlowDefinition { + + private final IntegrationFlowDefinition delegate + + protected GroovyIntegrationFlowDefinition(IntegrationFlowDefinition delegate) { + this.delegate = delegate + } + + /** + * Populate an {@link org.springframework.integration.channel.FixedSubscriberChannel} instance + * at the current {@link IntegrationFlow} chain position. + * The provided {@code messageChannelName} is used for the bean registration. + * @param messageChannelName the bean name to use. + */ + GroovyIntegrationFlowDefinition fixedSubscriberChannel(String messageChannelName = null) { + this.delegate.fixedSubscriberChannel messageChannelName + this + } + + /** + * Populate a {@link org.springframework.integration.dsl.support.MessageChannelReference} instance + * at the current {@link IntegrationFlow} chain position. + * The provided {@code messageChannelName} is used for the bean registration + * ({@link org.springframework.integration.channel.DirectChannel}), if there is no such a bean + * in the application context. Otherwise the existing {@link MessageChannel} bean is used + * to wire integration endpoints. + * @param messageChannelName the bean name to use. + */ + GroovyIntegrationFlowDefinition channel(String messageChannelName) { + this.delegate.channel messageChannelName + this + } + + /** + * Populate a {@link MessageChannel} instance + * at the current {@link IntegrationFlow} chain position using the {@link MessageChannelSpec} + * fluent API. + * @param messageChannelSpec the {@link MessageChannelSpec} to use. + * @see MessageChannels + */ + GroovyIntegrationFlowDefinition channel(MessageChannelSpec messageChannelSpec) { + this.delegate.channel messageChannelSpec + this + } + + /** + * Populate the provided {@link MessageChannel} instance + * at the current {@link IntegrationFlow} chain position. + * The {@code messageChannel} can be an existing bean, or fresh instance, in which case + * the {@link org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor} + * will populate it as a bean with a generated name. + * @param messageChannel the {@link MessageChannel} to populate. + */ + GroovyIntegrationFlowDefinition channel(MessageChannel messageChannel) { + this.delegate.channel messageChannel + this + } + + /** + * Populate a {@link MessageChannel} instance + * at the current {@link IntegrationFlow} chain position using the {@link Channels} + * factory fluent API. + * @param channels the {@link Function} to use. + */ + GroovyIntegrationFlowDefinition channel( + @DelegatesTo(value = Channels, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.Channels') + Closure channels) { + + Function function = + { channelFactory -> + channels.delegate = channelFactory + channels.resolveStrategy = Closure.DELEGATE_FIRST + channels() + } + + this.delegate.channel function + this + } + + /** + * The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel} + * method specific implementation to allow the use of the 'subflow' subscriber capability. + * @param executor the {@link Executor} to use. + * @param publishSubscribeChannelConfigurer the {@link java.util.function.Consumer} to specify + * {@link PublishSubscribeSpec} options including 'subflow' definition. + */ + GroovyIntegrationFlowDefinition publishSubscribeChannel( + Executor executor = null, + @DelegatesTo(value = PublishSubscribeSpec, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.PublishSubscribeSpec') + Closure publishSubscribeChannelConfigurer) { + + this.delegate.publishSubscribeChannel executor, createConfigurerIfAny(publishSubscribeChannelConfigurer) + this + } + + /** + * The {@link BroadcastCapableChannel} {@link #channel} + * method specific implementation to allow the use of the 'subflow' subscriber capability. + * @param broadcastCapableChannel the {@link BroadcastCapableChannel} to subscriber sub-flows to. + * @param publishSubscribeChannelConfigurer the {@link Consumer} to specify + * {@link BroadcastPublishSubscribeSpec} 'subflow' definitions. + */ + GroovyIntegrationFlowDefinition publishSubscribeChannel( + BroadcastCapableChannel broadcastCapableChannel, + @DelegatesTo(value = BroadcastPublishSubscribeSpec, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.BroadcastPublishSubscribeSpec') + Closure publishSubscribeChannelConfigurer) { + + this.delegate.publishSubscribeChannel broadcastCapableChannel, + createConfigurerIfAny(publishSubscribeChannelConfigurer) + this + } + + /** + * Populate the {@code Wire Tap} EI Pattern specific + * {@link ChannelInterceptor} implementation + * to the current message channel. + * It is useful when an implicit {@link MessageChannel} is used between endpoints. + * This method can be used after any {@link #channel} for explicit {@link MessageChannel}, + * but with the caution do not impact existing {@link ChannelInterceptor}s. + * @param flow the {@link IntegrationFlow} for wire-tap subflow as an alternative to the {@code wireTapChannel}. + * @param wireTapConfigurer the {@link Consumer} to accept options for the + * {@link org.springframework.integration.channel.interceptor.WireTap}. + */ + GroovyIntegrationFlowDefinition wireTap( + IntegrationFlow flow, + @DelegatesTo(value = WireTapSpec, strategy = Closure.DELEGATE_FIRST) + Closure wireTapConfigurer = null) { + + this.delegate.wireTap flow, createConfigurerIfAny(wireTapConfigurer) + this + } + + /** + * Populate the {@code Wire Tap} EI Pattern specific + * {@link ChannelInterceptor} implementation + * to the current message channel. + * It is useful when an implicit {@link MessageChannel} is used between endpoints. + * This method can be used after any {@link #channel} for explicit {@link MessageChannel}, + * but with the caution do not impact existing {@link ChannelInterceptor}s. + * @param wireTapChannel the {@link MessageChannel} bean name to wire-tap. + * @param wireTapConfigurer the {@link Consumer} to accept options for the + * {@link org.springframework.integration.channel.interceptor.WireTap}. + */ + GroovyIntegrationFlowDefinition wireTap(String wireTapChannel, + @DelegatesTo(value = WireTapSpec, strategy = Closure.DELEGATE_FIRST) + Closure wireTapConfigurer = null) { + + this.delegate.wireTap wireTapChannel, createConfigurerIfAny(wireTapConfigurer) + this + } + + /** + * Populate the {@code Wire Tap} EI Pattern specific + * {@link ChannelInterceptor} implementation + * to the current current MessageChannel. + * It is useful when an implicit {@link MessageChannel} is used between endpoints. + * This method can be used after any {@link #channel} for explicit {@link MessageChannel}, + * but with the caution do not impact existing {@link ChannelInterceptor}s. + * @param wireTapChannel the {@link MessageChannel} to wire-tap. + * @param wireTapConfigurer the {@link Consumer} to accept options for the + * {@link org.springframework.integration.channel.interceptor.WireTap}. + */ + GroovyIntegrationFlowDefinition wireTap(MessageChannel wireTapChannel, + @DelegatesTo(value = WireTapSpec, strategy = Closure.DELEGATE_FIRST) + Closure wireTapConfigurer = null) { + + this.delegate.wireTap wireTapChannel, createConfigurerIfAny(wireTapConfigurer) + this + } + + /** + * Populate the {@code Control Bus} EI Pattern specific {@link MessageHandler} implementation + * at the current {@link IntegrationFlow} chain position. + * @param endpointConfigurer the {@link Consumer} to accept integration endpoint options. + * @see org.springframework.integration.handler.ExpressionCommandMessageProcessor* @see GenericEndpointSpec + */ + GroovyIntegrationFlowDefinition controlBus( + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.controlBus createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@code Transformer} EI Pattern specific {@link org.springframework.messaging.MessageHandler} implementation + * for the SpEL {@link org.springframework.expression.Expression}. + * @param expression the {@code Transformer} {@link org.springframework.expression.Expression}. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see org.springframework.integration.transformer.ExpressionEvaluatingTransformer + */ + GroovyIntegrationFlowDefinition transform( + String expression, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.transform expression, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@code MessageTransformingHandler} for the + * {@link org.springframework.integration.transformer.MethodInvokingTransformer} + * to invoke the service method at runtime. + * @param service the service to use. + * @param methodName the method to invoke. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see ExpressionEvaluatingTransformer + */ + GroovyIntegrationFlowDefinition transform( + Object service, String methodName = null, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.transform service, methodName, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MessageTransformingHandler} instance for the + * {@link org.springframework.integration.handler.MessageProcessor} from provided {@link MessageProcessorSpec}. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param messageProcessorSpec the {@link MessageProcessorSpec} to use. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see MethodInvokingTransformer + */ + GroovyIntegrationFlowDefinition transform( + MessageProcessorSpec messageProcessorSpec, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.transform messageProcessorSpec, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MessageTransformingHandler} instance + * for the provided {@code payloadType} to convert at runtime. + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param payloadType the {@link Class} for expected payload type. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < P > the payload type - 'transform to'. + */ + public

GroovyIntegrationFlowDefinition convert( + Class

payloadType, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.convert payloadType, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer} + * for the specific {@code expectedType} to convert at runtime. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param expectedType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the transformer. + * Conversion to this type will be attempted, if necessary. + * @param genericTransformer the {@link GenericTransformer} to populate. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < P > the payload type - 'transform from', or {@code Message.class}. + * @param < T > the target type - 'transform to'. + */ + public GroovyIntegrationFlowDefinition transform( + GenericTransformer genericTransformer, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.transform genericTransformer, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MessageTransformingHandler} instance for the provided {@link GenericTransformer} + * for the specific {@code expectedType} to convert at runtime. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param expectedType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the transformer. + * Conversion to this type will be attempted, if necessary. + * @param genericTransformer the {@link GenericTransformer} to populate. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < P > the payload type - 'transform from', or {@code Message.class}. + * @param < T > the target type - 'transform to'. + */ + public GroovyIntegrationFlowDefinition transform( + Class

expectedType, + GenericTransformer genericTransformer, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + GenericTransformer lambdaWrapper = payload -> genericTransformer(payload) + + this.delegate.transform expectedType, lambdaWrapper, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link org.springframework.integration.filter.MessageFilter} with + * {@link org.springframework.integration.core.MessageSelector} for the provided SpEL expression. + * In addition accept options for the integration endpoint using {@link FilterEndpointSpec} + * @param expression the SpEL expression. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see FilterEndpointSpec + */ + public GroovyIntegrationFlowDefinition filter( + String expression, + @DelegatesTo(value = FilterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.filter expression, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link org.springframework.integration.filter.MessageFilter} + * with {@link org.springframework.integration.filter.MethodInvokingSelector} for the + * method of the provided service. + * @param service the service to use. + * @param methodName the method to invoke + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + */ + GroovyIntegrationFlowDefinition filter(Object service, String methodName = null, + @DelegatesTo(value = FilterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.filter service, methodName, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link org.springframework.integration.filter.MessageFilter} with {@link MethodInvokingSelector} + * for the {@link MessageProcessor} from + * the provided {@link MessageProcessorSpec}. + * @param messageProcessorSpec the {@link MessageProcessorSpec} to use. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + */ + GroovyIntegrationFlowDefinition filter(MessageProcessorSpec messageProcessorSpec, + @DelegatesTo(value = FilterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.filter messageProcessorSpec, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link org.springframework.integration.filter.MessageFilter} with {@link MethodInvokingSelector} + * for the provided {@link GenericSelector}. + * In addition, accept options for the integration endpoint using {@link FilterEndpointSpec}. + * @param expectedType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the selector. + * Conversion to this type will be attempted, if necessary. + * @param genericSelector the {@link GenericSelector} to use. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < P > the source payload type or {@code Message.class}. + */ + public

GroovyIntegrationFlowDefinition filter( + @DelegatesTo.Target Class

expectedType, + @DelegatesTo(genericTypeIndex = 0, strategy = Closure.DELEGATE_FIRST) + Closure genericSelector, + @DelegatesTo(value = FilterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + GenericSelector

lambdaWrapper = payload -> genericSelector(payload) + + this.delegate.filter expectedType, lambdaWrapper, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link ServiceActivatingHandler} for the + * {@link org.springframework.integration.handler.MethodInvokingMessageProcessor} + * to invoke the {@code method} for provided {@code bean} at runtime. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param beanName the bean name to use. + * @param methodName the method to invoke. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + */ + GroovyIntegrationFlowDefinition handle( + String beanName, String methodName, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.handle beanName, methodName, createConfigurerIfAny(endpointConfigurer) + this + } + + + /** + * Populate a {@link ServiceActivatingHandler} for the + * {@link org.springframework.integration.handler.MethodInvokingMessageProcessor} + * to invoke the {@code method} for provided {@code bean} at runtime. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param service the service object to use. + * @param methodName the method to invoke. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + */ + GroovyIntegrationFlowDefinition handle( + Object service, String methodName = null, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.handle service, methodName, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link ServiceActivatingHandler} for the + * {@link org.springframework.integration.handler.MethodInvokingMessageProcessor} + * to invoke the provided {@link GenericHandler} at runtime. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param expectedType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the handler. + * Conversion to this type will be attempted, if necessary. + * @param handler the handler to invoke. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < P > the payload type to expect or {@code Message.class}. + */ + public

GroovyIntegrationFlowDefinition handle( + Class

expectedType, GenericHandler

handler, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + GenericHandler

lambdaWrapper = (payload, headers) -> handler(payload, headers) + + this.delegate.handle expectedType, lambdaWrapper, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link ServiceActivatingHandler} for the + * {@link MessageProcessor} from the provided {@link MessageProcessorSpec}. + * @param messageProcessorSpec the {@link MessageProcessorSpec} to use. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + */ + GroovyIntegrationFlowDefinition handle( + MessageProcessorSpec messageProcessorSpec, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.handle messageProcessorSpec, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link ServiceActivatingHandler} for the selected protocol specific + * {@link MessageHandler} implementation from {@code Namespace Factory}: + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param messageHandlerSpec the {@link MessageHandlerSpec} to configure protocol specific + * {@link MessageHandler}. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < H > the {@link MessageHandler} type. + */ + public GroovyIntegrationFlowDefinition handle( + MessageHandlerSpec messageHandlerSpec, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.handle messageHandlerSpec, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link ServiceActivatingHandler} for the provided + * {@link MessageHandler} implementation. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param messageHandler the {@link MessageHandler} to use. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < H > the {@link MessageHandler} type. + */ + public GroovyIntegrationFlowDefinition handle( + H messageHandler, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.handle messageHandler, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link BridgeHandler} to the current integration flow position. + * sed with a Closure expression. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see GenericEndpointSpec + */ + GroovyIntegrationFlowDefinition bridge( + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType, options = 'org.springframework.integration.dsl.GenericEndpointSpec') + Closure endpointConfigurer = null) { + + this.delegate.bridge createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link org.springframework.integration.handler.DelayHandler} to the current integration flow position. + * @param groupId the {@code groupId} for delayed messages in the + * {@link org.springframework.integration.store.MessageGroupStore}. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see DelayerEndpointSpec + */ + GroovyIntegrationFlowDefinition delay( + String groupId, + @DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.delay groupId, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link org.springframework.integration.transformer.ContentEnricher} + * to the current integration flow position + * with provided options. + * Used with a Closure expression. + * @param enricherConfigurer the {@link Closure} to provide + * {@link org.springframework.integration.transformer.ContentEnricher} options. + * @see EnricherSpec + */ + GroovyIntegrationFlowDefinition enrich( + @DelegatesTo(value = EnricherSpec, strategy = Closure.DELEGATE_FIRST) + Closure enricherConfigurer) { + + this.delegate.enrich createConfigurerIfAny(enricherConfigurer) + this + } + + /** + * Populate a {@link MessageTransformingHandler} for + * a {@link org.springframework.integration.transformer.HeaderEnricher} + * as the result of provided {@link Consumer}. + * @param headerEnricherConfigurer the {@link Consumer} to use. + * @see HeaderEnricherSpec + */ + GroovyIntegrationFlowDefinition enrichHeaders( + @DelegatesTo(value = HeaderEnricherSpec, strategy = Closure.DELEGATE_FIRST) + Closure enricherConfigurer = null) { + + this.delegate.enrichHeaders createConfigurerIfAny(enricherConfigurer) + this + } + + /** + * Populate the {@link DefaultMessageSplitter} with provided options + * to the current integration flow position. + * Used with a Closure expression (optional). + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options + * and for {@link DefaultMessageSplitter}. + * @see SplitterEndpointSpec + */ + GroovyIntegrationFlowDefinition split( + @DelegatesTo(value = SplitterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.split createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link ExpressionEvaluatingSplitter} with provided + * SpEL expression. + * @param expression the splitter SpEL expression. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options + * and for {@link ExpressionEvaluatingSplitter}. + */ + GroovyIntegrationFlowDefinition split( + String expression, + @DelegatesTo(value = SplitterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.split expression, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MethodInvokingSplitter} to evaluate the provided + * {@code method} of the {@code bean} at runtime. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param service the service to use. + * @param methodName the method to invoke. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options + * and for {@link MethodInvokingSplitter}. + */ + GroovyIntegrationFlowDefinition split( + Object service, String methodName = null, + @DelegatesTo(value = SplitterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.split service, methodName, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MethodInvokingSplitter} to evaluate the provided + * {@code method} of the {@code bean} at runtime. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param beanName the bean name to use. + * @param methodName the method to invoke at runtime. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options + * and for {@link MethodInvokingSplitter}. + * @see SplitterEndpointSpec + */ + GroovyIntegrationFlowDefinition split( + String beanName, String methodName, + @DelegatesTo(value = SplitterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.split beanName, methodName, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MethodInvokingSplitter} to evaluate the + * {@link MessageProcessor} at runtime + * from provided {@link MessageProcessorSpec}. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param messageProcessorSpec the splitter {@link MessageProcessorSpec}. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options + * and for {@link MethodInvokingSplitter}. + */ + GroovyIntegrationFlowDefinition split( + MessageProcessorSpec messageProcessorSpec, + @DelegatesTo(value = SplitterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.split messageProcessorSpec, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MethodInvokingSplitter} to evaluate the provided + * {@link Function} at runtime. + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param expectedType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the splitter. + * Conversion to this type will be attempted, if necessary. + * @param splitter the splitter {@link Function}. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < P > the payload type or {@code Message.class}. + */ + public

GroovyIntegrationFlowDefinition split( + Class

expectedType, Function splitter, + @DelegatesTo(value = SplitterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + Function lambdaWrapper = payload -> splitter(payload) + + this.delegate.split expectedType, lambdaWrapper, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the provided {@link AbstractMessageSplitter} to the current integration + * flow position. + * @param splitterMessageHandlerSpec the {@link MessageHandlerSpec} to populate. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < S > the {@link AbstractMessageSplitter} + * @see SplitterEndpointSpec + */ + public GroovyIntegrationFlowDefinition split( + MessageHandlerSpec splitterMessageHandlerSpec, + @DelegatesTo(value = SplitterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.split splitterMessageHandlerSpec, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the provided {@link AbstractMessageSplitter} to the current integration + * flow position. + * @param splitter the {@link AbstractMessageSplitter} to populate. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param < S > the {@link AbstractMessageSplitter} + * @see SplitterEndpointSpec + */ + public GroovyIntegrationFlowDefinition split( + S splitter, + @DelegatesTo(value = SplitterEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.split splitter, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the provided {@link MessageTransformingHandler} for the provided + * {@link HeaderFilter}. + * @param headerFilter the {@link HeaderFilter} to use. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see GenericEndpointSpec + */ + GroovyIntegrationFlowDefinition headerFilter( + String headersToRemove, + boolean patternMatch = true, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + def headerFilter = new HeaderFilter(StringUtils.delimitedListToStringArray(headersToRemove, ',', ' ')) + headerFilter.patternMatch = patternMatch + + this.delegate.headerFilter headerFilter, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MessageTransformingHandler} for the + * {@link org.springframework.integration.transformer.ClaimCheckInTransformer} with provided {@link MessageStore}. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param messageStore the {@link MessageStore} to use. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see GenericEndpointSpec + */ + GroovyIntegrationFlowDefinition claimCheckIn( + MessageStore messageStore, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.claimCheckIn messageStore, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the {@link MessageTransformingHandler} for the + * {@link org.springframework.integration.transformer.ClaimCheckOutTransformer} + * with provided {@link MessageStore} and {@code removeMessage} flag. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param messageStore the {@link MessageStore} to use. + * @param removeMessage the removeMessage boolean flag. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see GenericEndpointSpec* @see org.springframework.integration.transformer.ClaimCheckOutTransformer#setRemoveMessage(boolean) + */ + GroovyIntegrationFlowDefinition claimCheckOut( + MessageStore messageStore, boolean removeMessage = false, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.claimCheckOut messageStore, removeMessage, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the + * {@link org.springframework.integration.aggregator.ResequencingMessageHandler} with + * provided options from {@link ResequencerSpec}. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * Used with a Closure expression (optional). + * @param resequencer the {@link Consumer} to provide + * {@link org.springframework.integration.aggregator.ResequencingMessageHandler} options. + * @see ResequencerSpec + */ + GroovyIntegrationFlowDefinition resequence( + @DelegatesTo(value = ResequencerSpec, strategy = Closure.DELEGATE_FIRST) + Closure resequencer = null) { + + this.delegate.resequence createConfigurerIfAny(resequencer) + this + } + + /** + * A short-cut for the {@code aggregate((aggregator) -> aggregator.processor(aggregatorProcessor))}. + * @param aggregatorProcessor the POJO representing aggregation strategies. + * @see AggregatorSpec + */ + GroovyIntegrationFlowDefinition aggregate(Object aggregatorProcessor) { + this.delegate.aggregate aggregatorProcessor + this + } + + /** + * Populate the {@link org.springframework.integration.aggregator.AggregatingMessageHandler} + * with provided options from {@link AggregatorSpec}. + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. + * Used with a Closure expression (optional). + * @param aggregator the {@link Consumer} to provide + * {@link org.springframework.integration.aggregator.AggregatingMessageHandler} options. + */ + GroovyIntegrationFlowDefinition aggregate( + @DelegatesTo(value = AggregatorSpec, strategy = Closure.DELEGATE_FIRST) + Closure aggregator = null) { + + this.delegate.aggregate createConfigurerIfAny(aggregator) + this + } + + /** + * Populate the {@link MethodInvokingRouter} for provided bean and its method + * with provided options from {@link RouterSpec}. + * @param beanName the bean to use. + * @param method the method to invoke at runtime. + * @param routerConfigurer the {@link Consumer} to provide {@link MethodInvokingRouter} options. + */ + GroovyIntegrationFlowDefinition route( + String beanName, String method, + @DelegatesTo(value = RouterSpec, strategy = Closure.DELEGATE_FIRST) + Closure routerConfigurer = null) { + + this.delegate.route beanName, method, createConfigurerIfAny(routerConfigurer) + this + } + + /** + * Populate the {@link MethodInvokingRouter} for the method + * of the provided service and its method with provided options from {@link RouterSpec}. + * @param service the service to use. + * @param methodName the method to invoke. + * @param routerConfigurer the {@link Consumer} to provide {@link MethodInvokingRouter} options. + */ + public GroovyIntegrationFlowDefinition route( + Object service, String methodName = null, + @DelegatesTo(value = RouterSpec, strategy = Closure.DELEGATE_FIRST) + Closure routerConfigurer = null) { + + this.delegate.route service, methodName, createConfigurerIfAny(routerConfigurer) + this + } + + /** + * Populate the {@link ExpressionEvaluatingRouter} for provided SpEL expression + * with provided options from {@link RouterSpec}. + * @param expression the expression to use. + * @param routerConfigurer the {@link Consumer} to provide {@link ExpressionEvaluatingRouter} options. + * @param < T > the target result type. + */ + public GroovyIntegrationFlowDefinition route( + String expression, + @DelegatesTo(value = RouterSpec, strategy = Closure.DELEGATE_FIRST) + Closure routerConfigurer = null) { + + this.delegate.route expression, createConfigurerIfAny(routerConfigurer) + this + } + + /** + * Populate the {@link MethodInvokingRouter} for provided {@link Function} + * and payload type and options from {@link RouterSpec}. + * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param expectedType the {@link Class} for expected payload type. It can also be + * {@code Message.class} if you wish to access the entire message in the router. + * Conversion to this type will be attempted, if necessary. + * @param router the {@link Function} to use. + * @param routerConfigurer the {@link Consumer} to provide {@link MethodInvokingRouter} options. + * @param < P > the source payload type or {@code Message.class}. + * @param < T > the target result type. + */ + public GroovyIntegrationFlowDefinition route( + Class

expectedType, Function router, + @DelegatesTo(value = RouterSpec, strategy = Closure.DELEGATE_FIRST) + Closure routerConfigurer = null) { + + Function lambdaWrapper = payload -> router(payload) + + this.delegate.route expectedType, lambdaWrapper, createConfigurerIfAny(routerConfigurer) + this + } + + /** + * Populate the {@link MethodInvokingRouter} for the + * {@link MessageProcessor} + * from the provided {@link MessageProcessorSpec} with default options. + * @param messageProcessorSpec the {@link MessageProcessorSpec} to use. + * @param routerConfigurer the {@link Consumer} to provide {@link MethodInvokingRouter} options. + */ + GroovyIntegrationFlowDefinition route( + MessageProcessorSpec messageProcessorSpec, + @DelegatesTo(value = RouterSpec, strategy = Closure.DELEGATE_FIRST) + Closure routerConfigurer = null) { + + this.delegate.route messageProcessorSpec, createConfigurerIfAny(routerConfigurer) + this + } + + /** + * Populate the {@link org.springframework.integration.router.RecipientListRouter} + * with options from the {@link RecipientListRouterSpec}. + * Typically used with a Closure expression. + * @param routerConfigurer the {@link Consumer} to provide + * {@link org.springframework.integration.router.RecipientListRouter} options. + */ + GroovyIntegrationFlowDefinition routeToRecipients( + @DelegatesTo(value = RecipientListRouterSpec, strategy = Closure.DELEGATE_FIRST) + Closure routerConfigurer = null) { + + this.delegate.routeToRecipients createConfigurerIfAny(routerConfigurer) + this + } + + /** + * Populate the {@link ErrorMessageExceptionTypeRouter} with options from the {@link RouterSpec}. + * Typically used with a Closure expression. + * @param routerConfigurer the {@link Consumer} to provide {@link ErrorMessageExceptionTypeRouter} options. + * @see ErrorMessageExceptionTypeRouter + */ + GroovyIntegrationFlowDefinition routeByException( + @DelegatesTo(value = RouterSpec, ErrorMessageExceptionTypeRouter>, + strategy = Closure.DELEGATE_FIRST) + Closure routerConfigurer = null) { + + this.delegate.routeByException createConfigurerIfAny(routerConfigurer) + this + } + + /** + * Populate the provided {@link AbstractMessageRouter} implementation to the + * current integration flow position. + * In addition accept options for the integration endpoint using {@link GenericEndpointSpec}. + * @param router the {@link AbstractMessageRouter} to populate. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @param the {@link AbstractMessageRouter} type. + */ + public GroovyIntegrationFlowDefinition route( + R router, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.route router, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the "artificial" + * {@link org.springframework.integration.gateway.GatewayMessageHandler} for the + * provided {@code requestChannel} to send a request with options from + * {@link GatewayEndpointSpec}. Uses + * {@link org.springframework.integration.gateway.RequestReplyExchanger} Proxy on the + * background. + * @param requestChannel the {@link MessageChannel} bean name. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint + * options. + */ + GroovyIntegrationFlowDefinition gateway( + String requestChannel, + @DelegatesTo(value = GatewayEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.gateway requestChannel, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the "artificial" + * {@link org.springframework.integration.gateway.GatewayMessageHandler} for the + * provided {@code requestChannel} to send a request with options from + * {@link GatewayEndpointSpec}. Uses + * {@link org.springframework.integration.gateway.RequestReplyExchanger} Proxy on the + * background. + * @param requestChannel the {@link MessageChannel} to use. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint + * options. + */ + GroovyIntegrationFlowDefinition gateway( + MessageChannel requestChannel, + @DelegatesTo(value = GatewayEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.gateway requestChannel, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate the "artificial" + * {@link org.springframework.integration.gateway.GatewayMessageHandler} for the + * provided {@code subflow} with options from {@link GatewayEndpointSpec}. + * Typically used with a Closure expression. + * @param flow the {@link IntegrationFlow} to to send a request message and wait for reply. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + */ + GroovyIntegrationFlowDefinition gateway( + IntegrationFlow flow, + @DelegatesTo(value = GatewayEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.gateway flow, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link WireTap} for the current message channel + * with the {@link LoggingHandler} subscriber for provided {@link LoggingHandler.Level} + * logging level and {@code org.springframework.integration.handler.LoggingHandler} + * as a default logging category. + *

The full request {@link Message} will be logged. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * @param level the {@link LoggingHandler.Level}. + */ + GroovyIntegrationFlowDefinition log(LoggingHandler.Level level = LoggingHandler.Level.INFO, + String category, String logExpression = null) { + + if (logExpression) { + this.delegate.log level, category, logExpression + } else { + this.delegate.log level, category + } + this + } + + /** + * Populate a {@link WireTap} for the current message channel + * with the {@link LoggingHandler} subscriber for the provided + * {@link LoggingHandler.Level} logging level, logging category + * and {@link Function} for the log message. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * @param level the {@link LoggingHandler.Level}. + * @param category the logging category. + * @param function the function to evaluate logger message at runtime + * @param < P > the expected payload type against the request {@link Message}. + */ + public

GroovyIntegrationFlowDefinition log( + LoggingHandler.Level level = LoggingHandler.Level.INFO, + String category = null, + Function, Object> function) { + + this.delegate.log level, category, function + this + } + + /** + * Populate a {@link org.springframework.integration.scattergather.ScatterGatherHandler} + * to the current integration flow position + * based on the provided {@link MessageChannel} for scattering function + * and {@link AggregatorSpec} for gathering function. + * @param scatterChannel the {@link MessageChannel} for scatting requests. + * @param gatherer the {@link Consumer} for {@link AggregatorSpec} to configure gatherer. + * Can be {@code null}. + * @param scatterGather the {@link Consumer} for {@link ScatterGatherSpec} to configure + * {@link org.springframework.integration.scattergather.ScatterGatherHandler} and its endpoint. Can be {@code null}. + */ + GroovyIntegrationFlowDefinition scatterGather( + MessageChannel scatterChannel, + @DelegatesTo(value = AggregatorSpec, strategy = Closure.DELEGATE_FIRST) + Closure gatherer = null, + @DelegatesTo(value = ScatterGatherSpec, strategy = Closure.DELEGATE_FIRST) + Closure scatterGather = null) { + + this.delegate.scatterGather scatterChannel, createConfigurerIfAny(gatherer), + createConfigurerIfAny(scatterGather) + this + } + + /** + * Populate a {@link ScatterGatherHandler} to the current integration flow position + * based on the provided {@link RecipientListRouterSpec} for scattering function + * and {@link AggregatorSpec} for gathering function. + * @param scatterer the {@link Consumer} for {@link RecipientListRouterSpec} to configure scatterer. + * @param gatherer the {@link Consumer} for {@link AggregatorSpec} to configure gatherer. + * @param scatterGather the {@link Consumer} for {@link ScatterGatherSpec} to configure + * {@link ScatterGatherHandler} and its endpoint. Can be {@code null}. + */ + GroovyIntegrationFlowDefinition scatterGather( + @DelegatesTo(value = RecipientListRouterSpec, strategy = Closure.DELEGATE_FIRST) + Closure scatterer, + @DelegatesTo(value = AggregatorSpec, strategy = Closure.DELEGATE_FIRST) + Closure gatherer = null, + @DelegatesTo(value = ScatterGatherSpec, strategy = Closure.DELEGATE_FIRST) + Closure scatterGather = null) { + + this.delegate.scatterGather createConfigurerIfAny(scatterer), createConfigurerIfAny(gatherer), + createConfigurerIfAny(scatterGather) + this + } + + /** + * Populate a {@link org.springframework.integration.aggregator.BarrierMessageHandler} + * instance for provided timeout and options from {@link BarrierSpec} and endpoint + * options from {@link GenericEndpointSpec}. + * @param timeout the timeout in milliseconds. + * @param barrierConfigurer the {@link Consumer} to provide + * {@link org.springframework.integration.aggregator.BarrierMessageHandler} options. + */ + GroovyIntegrationFlowDefinition barrier(long timeout, + @DelegatesTo(value = BarrierSpec, strategy = Closure.DELEGATE_FIRST) + Closure barrierConfigurer = null) { + + this.delegate.barrier timeout, createConfigurerIfAny(barrierConfigurer) + this + } + + /** + * Populate a {@link ServiceActivatingHandler} instance to perform {@link MessageTriggerAction} + * and endpoint options from {@link GenericEndpointSpec}. + * @param triggerActionId the {@link MessageTriggerAction} bean id. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + */ + GroovyIntegrationFlowDefinition trigger( + String triggerActionId, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.trigger triggerActionId, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Populate a {@link ServiceActivatingHandler} instance to perform {@link MessageTriggerAction} + * and endpoint options from {@link GenericEndpointSpec}. + * @param triggerAction the {@link MessageTriggerAction}. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + */ + GroovyIntegrationFlowDefinition trigger( + MessageTriggerAction triggerAction, + @DelegatesTo(value = GenericEndpointSpec, strategy = Closure.DELEGATE_FIRST) + Closure endpointConfigurer = null) { + + this.delegate.trigger triggerAction, createConfigurerIfAny(endpointConfigurer) + this + } + + /** + * Add one or more {@link ChannelInterceptor} implementations + * to the current {@link MessageChannel}, in the given order, after any interceptors already registered. + * @param interceptorArray one or more {@link ChannelInterceptor}s. + */ + GroovyIntegrationFlowDefinition intercept(ChannelInterceptor... interceptorArray) { + this.delegate.intercept interceptorArray + this + } + + /** + * Populate a {@link org.springframework.integration.channel.FluxMessageChannel} + * to start a reactive processing for upstream data, + * wrap it to a {@link Flux}, apply provided {@link Function} via {@link Flux#transform(Function)} + * and emit the result to one more + * {@link org.springframework.integration.channel.FluxMessageChannel}, subscribed in the downstream flow. + * @param fluxFunction the {@link Function} to process data reactive manner. + * @param < I > the input payload type. + * @param < O > the output type. + */ + public GroovyIntegrationFlowDefinition fluxTransform( + Function>, ? extends Publisher> fluxFunction) { + + this.delegate. fluxTransform fluxFunction + this + } + + protected static Consumer createConfigurerIfAny( + @DelegatesTo(strategy = Closure.DELEGATE_FIRST) Closure closure) { + + if (closure) { + return { + closure.delegate = it + closure.resolveStrategy = Closure.DELEGATE_FIRST + closure() + } as Consumer + } + null + } + +} diff --git a/spring-integration-groovy-dsl/src/main/groovy/org/springframework/integration/dsl/IntegrationGroovyDsl.groovy b/spring-integration-groovy-dsl/src/main/groovy/org/springframework/integration/dsl/IntegrationGroovyDsl.groovy new file mode 100644 index 0000000..0181e04 --- /dev/null +++ b/spring-integration-groovy-dsl/src/main/groovy/org/springframework/integration/dsl/IntegrationGroovyDsl.groovy @@ -0,0 +1,256 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl + +import groovy.transform.CompileStatic +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType +import org.reactivestreams.Publisher +import org.springframework.integration.core.MessageSource +import org.springframework.integration.endpoint.MessageProducerSupport +import org.springframework.integration.gateway.MessagingGatewaySupport +import org.springframework.messaging.Message +import org.springframework.messaging.MessageChannel + +import java.util.function.Consumer +import java.util.function.Supplier + + +/** + * The factory class for Spring Integration Groovy DSL closures. + * + * @author Artem Bilan + */ +@CompileStatic +class IntegrationGroovyDsl { + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for {@link IntegrationFlow} lambdas. + * @param flow the {@link Closure} for {@link IntegrationFlowDefinition} + */ + static IntegrationFlow integrationFlow( + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + { IntegrationFlowDefinition flowDefinition -> + flow.delegate = new GroovyIntegrationFlowDefinition(flowDefinition) + flow.resolveStrategy = Closure.DELEGATE_FIRST + flow() + } as IntegrationFlow + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(Class, Consumer)} factory method. + */ + static IntegrationFlow integrationFlow( + Class serviceInterface, + @DelegatesTo(value = GatewayProxySpec, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GatewayProxySpec') + Closure gatewaySpec = null, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + Consumer configurer = GroovyIntegrationFlowDefinition.createConfigurerIfAny(gatewaySpec) + buildIntegrationFlow(IntegrationFlows.from(serviceInterface, configurer), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(String, boolean)} factory method. + */ + static IntegrationFlow integrationFlow( + String channelName, + Boolean fixedSubscriber = false, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + buildIntegrationFlow(IntegrationFlows.from(channelName, fixedSubscriber), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(MessageChannel)} factory method. + */ + static IntegrationFlow integrationFlow( + MessageChannel channel, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + buildIntegrationFlow(IntegrationFlows.from(channel), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(MessageSource, Consumer)} factory method. + */ + static IntegrationFlow integrationFlow( + MessageSource messageSource, + @DelegatesTo(value = SourcePollingChannelAdapterSpec, strategy = Closure.DELEGATE_FIRST) + Closure adapterSpec = null, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + Consumer configurer = + GroovyIntegrationFlowDefinition.createConfigurerIfAny(adapterSpec) + buildIntegrationFlow(IntegrationFlows.from(messageSource, configurer), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(MessageSourceSpec, Consumer)} factory method. + */ + static IntegrationFlow integrationFlow( + MessageSourceSpec messageSourceSpec, + @DelegatesTo(value = SourcePollingChannelAdapterSpec, strategy = Closure.DELEGATE_FIRST) + Closure adapterSpec = null, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + Consumer configurer = + GroovyIntegrationFlowDefinition.createConfigurerIfAny(adapterSpec) + buildIntegrationFlow(IntegrationFlows.from(messageSourceSpec, configurer), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#fromSupplier(Supplier, Consumer)} factory method. + */ + static IntegrationFlow integrationFlow( + Closure source, + @DelegatesTo(value = SourcePollingChannelAdapterSpec, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.SourcePollingChannelAdapterSpec') + Closure adapterSpec = null, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + Consumer configurer = + GroovyIntegrationFlowDefinition.createConfigurerIfAny(adapterSpec) + buildIntegrationFlow(IntegrationFlows.fromSupplier(source, configurer), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(Publisher)} factory method. + */ + static IntegrationFlow integrationFlow( + Publisher> publisher, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + buildIntegrationFlow(IntegrationFlows.from(publisher), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(MessagingGatewaySupport)} factory method. + */ + static IntegrationFlow integrationFlow( + MessagingGatewaySupport gateway, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + buildIntegrationFlow(IntegrationFlows.from(gateway), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(MessagingGatewaySpec)} factory method. + */ + static IntegrationFlow integrationFlow( + MessagingGatewaySpec gatewaySpec, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + buildIntegrationFlow(IntegrationFlows.from(gatewaySpec), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(MessageProducerSupport)} factory method. + */ + static IntegrationFlow integrationFlow( + MessageProducerSupport producer, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + buildIntegrationFlow(IntegrationFlows.from(producer), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(MessageProducerSpec)} factory method. + */ + static IntegrationFlow integrationFlow( + MessageProducerSpec producerSpec, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + buildIntegrationFlow(IntegrationFlows.from(producerSpec), flow) + } + + /** + * Functional {@link IntegrationFlow} definition in Groovy DSL for + * {@link IntegrationFlows#from(IntegrationFlow)} factory method. + */ + static IntegrationFlow integrationFlow( + IntegrationFlow sourceFlow, + @DelegatesTo(value = GroovyIntegrationFlowDefinition, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, + options = 'org.springframework.integration.dsl.GroovyIntegrationFlowDefinition') + Closure flow) { + + buildIntegrationFlow(IntegrationFlows.from(sourceFlow), flow) + } + + private static IntegrationFlow buildIntegrationFlow(IntegrationFlowBuilder flowBuilder, Closure flow) { + flow.delegate = new GroovyIntegrationFlowDefinition(flowBuilder) + flow.resolveStrategy = Closure.DELEGATE_FIRST + flow() + flowBuilder.get() + } + + private IntegrationGroovyDsl() { + } + +} diff --git a/spring-integration-groovy-dsl/src/main/groovy/org/springframework/integration/dsl/package-info.groovy b/spring-integration-groovy-dsl/src/main/groovy/org/springframework/integration/dsl/package-info.groovy new file mode 100644 index 0000000..28b28a3 --- /dev/null +++ b/spring-integration-groovy-dsl/src/main/groovy/org/springframework/integration/dsl/package-info.groovy @@ -0,0 +1,4 @@ +/** + * Provides Spring Integration Groovy DSL. + */ +package org.springframework.integration.dsl \ No newline at end of file diff --git a/spring-integration-groovy-dsl/src/test/groovy/org/springframework/integration/dsl/test/GroovyDslTests.groovy b/spring-integration-groovy-dsl/src/test/groovy/org/springframework/integration/dsl/test/GroovyDslTests.groovy new file mode 100644 index 0000000..d946277 --- /dev/null +++ b/spring-integration-groovy-dsl/src/test/groovy/org/springframework/integration/dsl/test/GroovyDslTests.groovy @@ -0,0 +1,314 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.test + +import org.springframework.beans.factory.BeanFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.integration.channel.FluxMessageChannel +import org.springframework.integration.channel.QueueChannel +import org.springframework.integration.config.EnableIntegration +import org.springframework.integration.dsl.IntegrationFlow +import org.springframework.integration.dsl.IntegrationFlowDefinition +import org.springframework.integration.dsl.IntegrationFlows +import org.springframework.integration.dsl.Pollers +import org.springframework.integration.dsl.Transformers +import org.springframework.integration.dsl.context.IntegrationFlowContext +import org.springframework.integration.handler.LoggingHandler +import org.springframework.integration.scheduling.PollerMetadata +import org.springframework.integration.support.MessageBuilder +import org.springframework.messaging.Message +import org.springframework.messaging.MessageChannel +import org.springframework.messaging.MessageHeaders +import org.springframework.messaging.PollableChannel +import org.springframework.messaging.support.GenericMessage +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig +import reactor.core.publisher.Flux +import reactor.test.StepVerifier +import spock.lang.Specification + +import java.time.Duration +import java.util.function.Function + +import static org.springframework.integration.dsl.IntegrationGroovyDsl.integrationFlow + +@SpringJUnitConfig +@DirtiesContext +class GroovyDslTests extends Specification { + + @Autowired + private BeanFactory beanFactory + + @Autowired + private IntegrationFlowContext integrationFlowContext + + @Autowired + private PollableChannel pollerResultChannel + + @Autowired + @Qualifier('requestReplyFlow.input') + private MessageChannel requestReplyFlowInput + + @Autowired + private MessageChannel requestReplyFixedFlowInput + + @Autowired + @Qualifier('functionGateway') + private Function upperCaseFunction + + def 'when application starts, it emits message to pollerResultChannel'() { + expect: 'message in the pollerResultChannel' + this.pollerResultChannel.receive(10000) + this.pollerResultChannel.receive(10000) + } + + def 'requestReplyFlow has to reply'() { + given: + def replyChannel = new QueueChannel() + def testMessage = + MessageBuilder.withPayload('hello') + .setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel) + .build() + + when: + this.requestReplyFlowInput.send(testMessage) + + then: + replyChannel.receive(1000).payload == 'HELLO' + } + + def 'requestReplyFixedFlow has to reply'() { + given: + def replyChannel = new QueueChannel() + def testMessage = + MessageBuilder.withPayload(4) + .setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel) + .build() + + when: + this.requestReplyFixedFlowInput.send(testMessage) + + then: + replyChannel.receive(1000).payload == 16 + } + + def 'uppercase function'() { + expect: 'uppercase function flow works' + this.upperCaseFunction.apply('test'.bytes) == 'TEST' + } + + def 'reactive publisher flow'() { + given: + def fluxChannel = new FluxMessageChannel() + + def verifyLater = + StepVerifier + .create(Flux.from(fluxChannel).map { it.payload }) + .expectNext(4, 6) + .thenCancel() + .verifyLater() + + def publisher = Flux.just(2, 3).map { new GenericMessage<>(it) } + + def integrationFlow = + integrationFlow(publisher) + { + transform Message, { it.payload * 2 }, { id 'foo' } + channel fluxChannel + } + + when: + def registration = this.integrationFlowContext.registration(integrationFlow).register() + + then: + verifyLater.verify(Duration.ofSeconds(10)) + + registration.destroy() + } + + @Autowired + @Qualifier('scatterGatherFlow.input') + private MessageChannel scatterGatherFlowInput + + def 'Scatter-Gather'() { + given: + def replyChannel = new QueueChannel() + def request = + MessageBuilder.withPayload("foo") + .setReplyChannel(replyChannel) + .build() + + when: + this.scatterGatherFlowInput.send(request) + + then: + def bestQuoteMessage = replyChannel.receive(10000) + (bestQuoteMessage?.payload as List).size() >= 1 + } + + @Autowired + @Qualifier('oddFlow.input') + private MessageChannel oddFlowInput + + def 'oddFlow must reply'() { + given: + def replyChannel = new QueueChannel() + def testMessage = + MessageBuilder.withPayload('test') + .setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel) + .build() + + when: + this.oddFlowInput.send(testMessage) + + then: + replyChannel.receive(1000).payload == 'odd' + } + + + @Autowired + @Qualifier('flowLambda.input') + private MessageChannel flowLambdaInput + + @Autowired + private PollableChannel wireTapChannel + + def 'flow from lambda'() { + given: + def replyChannel = new QueueChannel() + def message = MessageBuilder.withPayload('test').setReplyChannel(replyChannel).build() + + when: + this.flowLambdaInput.send message + + then: + replyChannel.receive(10_000)?.payload == 'TEST' + this.wireTapChannel.receive(10_000)?.payload == 'test' + } + + @Configuration + @EnableIntegration + static class Config { + + @Bean(PollerMetadata.DEFAULT_POLLER) + poller() { + Pollers.fixedDelay(1000).get() + } + + + @Bean + someFlow() { + integrationFlow { 'test' } + { + log LoggingHandler.Level.WARN, 'test.category' + channel { queue 'pollerResultChannel' } + } + } + + @Bean + requestReplyFlow() { + integrationFlow { + fluxTransform { it.map { it } } + transform String, { it.toUpperCase() } + } + } + + @Bean + requestReplyFixedFlow() { + integrationFlow 'requestReplyFixedFlowInput', true, + { + handle Integer, { p, h -> p**2 } + } + } + + @Bean + functionFlow() { + integrationFlow Function, + { beanName 'functionGateway' }, + { + transform Transformers.objectToString(), { id 'objectToStringTransformer' } + transform String, { it.toUpperCase() } + split Message, { it.payload } + split Object, { it }, { id 'splitterEndpoint' } + resequence() + aggregate { + id 'aggregator' + outputProcessor { it.one } + } + } + } + + @Bean + scatterGatherFlow() { + integrationFlow { + scatterGather( + { + applySequence true + recipientFlow({ true }, recipientSubFlow()) + recipientFlow({ true }, + integrationFlow { handle Void, { p, h -> Math.random() * 10 } }) + recipientFlow({ true }, + integrationFlow { handle Void, { p, h -> Math.random() * 10 } }) + }, + { + releaseStrategy { + it.size() == 3 || it.messages.any { it.payload as Double > 5 } + } + }) + { + gatherTimeout 10_000 + } + } + } + + static recipientSubFlow() { + integrationFlow { handle Void, { p, h -> Math.random() * 10 } } + } + + @Bean + IntegrationFlow oddFlow() { + { IntegrationFlowDefinition flow -> + flow.handle(Object, { p, h -> 'odd' }) + } + } + + @Bean + flowLambda() { + integrationFlow { + filter String, { it == 'test' }, { id 'filterEndpoint' } + wireTap integrationFlow { + channel { queue 'wireTapChannel' } + } + delay 'delayGroup', { defaultDelay 100 } + transform String, { it.toUpperCase() } + } + } + + @Bean + flowFromSupplier() { + IntegrationFlows.fromSupplier({ 'bar' }) { e -> e.poller { p -> p.fixedDelay(10).maxMessagesPerPoll(1) } } + .channel({ c -> c.queue('fromSupplierQueue') } as Function) + .get() + } + + + } + +} diff --git a/spring-integration-groovy-dsl/src/test/resources/log4j2-test.xml b/spring-integration-groovy-dsl/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000..f3bed9a --- /dev/null +++ b/spring-integration-groovy-dsl/src/test/resources/log4j2-test.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + +