Rely on the BaseIntegrationFlowDefinition

To avoid some inline reified functions names, it would be better to rely
on the `BaseIntegrationFlowDefinition` from SI Core.
This way we can have the same names for Kotlin-specific functions
without conflicts for similar methods in Java which are just a syntax
sugar without any target type guarantee

* * Add `KotlinIntegrationFlowDefinition` as an extension of `BaseIntegrationFlowDefinition`
to be able to provide inline functions for reified generic types.
* Use this new type from the `integrationFlow()` DSL functions to avoid
extra imports for global inline functions otherwise

* * Improve README
This commit is contained in:
Artem Bilan
2019-10-28 10:13:20 -04:00
committed by GitHub
parent f405d4b36f
commit 4024a0db70
6 changed files with 192 additions and 127 deletions

View File

@@ -6,7 +6,7 @@ NOTE: The project should be treated as experimental and API is a subject to chan
The main goal we pursue here is to make Spring Integration development on Kotlin as smooth and straightforward as is it possible with interoperability with existing Java DSL and some Kotlin extensions or language-specific structures.
To use classes and functions from this project, you need to add `import org.springframework.integration.dsl.kotlin.*` into your code.
All you need to get started is just an import for `org.springframework.integration.dsl.integrationFlow` - an overloaded global function for Kotlin DSL.
For `IntegrationFlow` definitions as lambdas we typically don't need anything else from Kotlin and just declare a bean like this:
@@ -31,17 +31,17 @@ As an alternative to the construction above and for consistency with use-cases e
@Bean
fun flowLambda() =
integrationFlow {
filter<String> { it === "test" }
filter<String>({ it === "test" })
wireTap(
integrationFlow {
handle { m -> println(m.payload) }
})
transform<String, String> { it.toUpperCase() }
transform<String, String>({ it.toUpperCase() })
}
----
====
Such a global `integrationFlow()` function expects a `@BuilderInference` for an `IntegrationFlowDefinition<*>` and produces a regular `IntegrationFlow` lambda implementation.
Such a global `integrationFlow()` function expects a `@BuilderInference` for a `KotlinIntegrationFlowDefinition` (a Kotlin extension for the `BaseIntegrationFlowDefinition` with some inline function for reified generic types) and produces a regular `IntegrationFlow` lambda implementation.
See more overloaded `integrationFlow()` variants below.
Many other scenarios require an `IntegrationFlow` to be started from source of data (e.g. `JdbcPollingChannelAdapter`, `JmsInboundGateway` or just an existing `MessageChannel`).
@@ -62,7 +62,7 @@ fun flowFromSupplier() =
But unfortunately not all `from()` methods are compatible with Kotlin structures.
To fix a gap, this project provides a Kotlin DSL around an `IntegrationFlows` factory.
It is done as a set of overloaded `integrationFlow()` functions.
With the consumer for an `IntegrationFlowDefinition<*>` to declare the rest of the flow as an `IntegrationFlow` lambda to reuse the mentioned above experience and also avoid `get()` call in the end.
With a consumer for a `KotlinIntegrationFlowDefinition` to declare the rest of the flow as an `IntegrationFlow` lambda to reuse the mentioned above experience and also avoid `get()` call in the end.
For example:
====

View File

@@ -31,7 +31,7 @@ ext {
junitVersion = '5.5.2'
log4jVersion = '2.12.1'
reactorVersion = 'Dysprosium-RELEASE'
springIntegrationVersion = '5.2.0.RELEASE'
springIntegrationVersion = '5.2.1.BUILD-SNAPSHOT'
idPrefix = 'kotlin-dsl'
@@ -126,6 +126,9 @@ task api(type: org.jetbrains.dokka.gradle.DokkaTask) {
externalDocumentationLink {
url = new URL("https://docs.spring.io/spring-integration/docs/$springIntegrationVersion/api/")
}
externalDocumentationLink {
url = new URL("https://docs.spring.io/spring-framework/docs/current/javadoc-api/")
}
externalDocumentationLink {
url = new URL('https://projectreactor.io/docs/core/release/api/')
}

View File

@@ -16,19 +16,10 @@
@file:UseExperimental(kotlin.experimental.ExperimentalTypeInference::class)
package org.springframework.integration.dsl.kotlin
package org.springframework.integration.dsl
import org.reactivestreams.Publisher
import org.springframework.integration.core.MessageSource
import org.springframework.integration.dsl.GatewayProxySpec
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlowBuilder
import org.springframework.integration.dsl.IntegrationFlowDefinition
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.dsl.MessageProducerSpec
import org.springframework.integration.dsl.MessageSourceSpec
import org.springframework.integration.dsl.MessagingGatewaySpec
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec
import org.springframework.integration.endpoint.MessageProducerSupport
import org.springframework.integration.gateway.MessagingGatewaySupport
import org.springframework.messaging.Message
@@ -36,9 +27,9 @@ import org.springframework.messaging.MessageChannel
import java.util.function.Consumer
private fun buildIntegrationFlow(flowBuilder: IntegrationFlowBuilder,
flow: (IntegrationFlowDefinition<*>) -> Unit): IntegrationFlow {
flow: (KotlinIntegrationFlowDefinition) -> Unit): IntegrationFlow {
flow.invoke(flowBuilder)
flow(KotlinIntegrationFlowDefinition(flowBuilder))
return flowBuilder.get()
}
@@ -47,9 +38,9 @@ private fun buildIntegrationFlow(flowBuilder: IntegrationFlowBuilder,
*
* @author Artem Bilan
*/
fun integrationFlow(@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
fun integrationFlow(@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
IntegrationFlow {
flow(it)
flow(KotlinIntegrationFlowDefinition(it))
}
/**
@@ -59,10 +50,11 @@ fun integrationFlow(@BuilderInference flow: IntegrationFlowDefinition<*>.() -> U
* @author Artem Bilan
*/
inline fun <reified T> integrationFlow(crossinline gateway: (GatewayProxySpec) -> Unit = {},
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit): IntegrationFlow {
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit):
IntegrationFlow {
val flowBuilder = IntegrationFlows.from(T::class.java) { gateway(it) }
flow.invoke(flowBuilder)
flow(KotlinIntegrationFlowDefinition(flowBuilder))
return flowBuilder.get()
}
@@ -73,7 +65,7 @@ inline fun <reified T> integrationFlow(crossinline gateway: (GatewayProxySpec) -
* @author Artem Bilan
*/
fun integrationFlow(channelName: String, fixedSubscriber: Boolean = false,
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(channelName, fixedSubscriber), flow)
/**
@@ -82,7 +74,7 @@ fun integrationFlow(channelName: String, fixedSubscriber: Boolean = false,
*
* @author Artem Bilan
*/
fun integrationFlow(channel: MessageChannel, @BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
fun integrationFlow(channel: MessageChannel, @BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(channel), flow)
/**
@@ -93,7 +85,7 @@ fun integrationFlow(channel: MessageChannel, @BuilderInference flow: Integration
*/
fun integrationFlow(messageSource: MessageSource<*>,
options: (SourcePollingChannelAdapterSpec) -> Unit = {},
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(messageSource, Consumer { options(it) }), flow)
/**
@@ -104,7 +96,7 @@ fun integrationFlow(messageSource: MessageSource<*>,
*/
fun integrationFlow(messageSource: MessageSourceSpec<*, out MessageSource<*>>,
options: (SourcePollingChannelAdapterSpec) -> Unit = {},
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(messageSource, options), flow)
/**
@@ -115,7 +107,7 @@ fun integrationFlow(messageSource: MessageSourceSpec<*, out MessageSource<*>>,
*/
fun integrationFlow(source: () -> Any,
options: (SourcePollingChannelAdapterSpec) -> Unit = {},
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(source, options), flow)
/**
@@ -125,7 +117,7 @@ fun integrationFlow(source: () -> Any,
* @author Artem Bilan
*/
fun integrationFlow(publisher: Publisher<out Message<*>>,
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(publisher), flow)
/**
@@ -135,7 +127,7 @@ fun integrationFlow(publisher: Publisher<out Message<*>>,
* @author Artem Bilan
*/
fun integrationFlow(gateway: MessagingGatewaySupport,
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(gateway), flow)
/**
@@ -145,7 +137,7 @@ fun integrationFlow(gateway: MessagingGatewaySupport,
* @author Artem Bilan
*/
fun integrationFlow(gatewaySpec: MessagingGatewaySpec<*, *>,
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(gatewaySpec), flow)
/**
@@ -155,7 +147,7 @@ fun integrationFlow(gatewaySpec: MessagingGatewaySpec<*, *>,
* @author Artem Bilan
*/
fun integrationFlow(producer: MessageProducerSupport,
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(producer), flow)
/**
@@ -165,5 +157,5 @@ fun integrationFlow(producer: MessageProducerSupport,
* @author Artem Bilan
*/
fun integrationFlow(producerSpec: MessageProducerSpec<*, *>,
@BuilderInference flow: IntegrationFlowDefinition<*>.() -> Unit) =
@BuilderInference flow: KotlinIntegrationFlowDefinition.() -> Unit) =
buildIntegrationFlow(IntegrationFlows.from(producerSpec), flow)

View File

@@ -0,0 +1,157 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl
import org.springframework.integration.router.MethodInvokingRouter
import org.springframework.integration.splitter.MethodInvokingSplitter
import org.springframework.integration.transformer.MessageTransformingHandler
import org.springframework.messaging.MessageChannel
/**
* A [BaseIntegrationFlowDefinition] extension for Kotlin-specif inline functions with reified
* generic types.
*
* @property adaptee the [IntegrationFlowDefinition] this instance is adapted.
*
* @author Artem Bilan
*/
class KotlinIntegrationFlowDefinition(private val adaptee: IntegrationFlowDefinition<*>) :
BaseIntegrationFlowDefinition<KotlinIntegrationFlowDefinition>() {
/**
* Delegate a provided component into an `adaptee` set of components.
*/
override fun addComponent(component: Any): KotlinIntegrationFlowDefinition {
return addComponent(component, null)
}
/**
* Delegate a provided component into an `adaptee` set of components.
*/
override fun addComponent(component: Any, beanName: String?): KotlinIntegrationFlowDefinition {
this.adaptee.addComponent(component, beanName)
return _this()
}
/**
* Delegate provided components into an `adaptee` set of components.
*/
override fun addComponents(components: Map<Any, String>?): KotlinIntegrationFlowDefinition {
this.adaptee.addComponents(components)
return _this()
}
/**
* Get a [Map] of components from `adaptee`.
*/
override fun getIntegrationComponents(): Map<Any, String> {
return this.adaptee.getIntegrationComponents()
}
/**
* Set a provided [MessageChannel] as a current in the `adaptee`.
*/
override fun currentMessageChannel(currentMessageChannel: MessageChannel?): KotlinIntegrationFlowDefinition {
this.adaptee.currentMessageChannel(currentMessageChannel)
return _this()
}
/**
* Get a current [MessageChannel] from the `adaptee`.
*/
override fun getCurrentMessageChannel(): MessageChannel? {
return this.adaptee.getCurrentMessageChannel()
}
/**
* Delegate a provided component into an `adaptee` current component.
*/
override fun currentComponent(component: Any?): KotlinIntegrationFlowDefinition {
this.adaptee.currentComponent(component)
return _this()
}
/**
* Get a current component from `adaptee`.
*/
override fun getCurrentComponent(): Any? {
return this.adaptee.getCurrentComponent()
}
/**
* Set a flag for an implicit channel on the `adaptee`.
*/
override fun setImplicitChannel(implicitChannel: Boolean) {
this.adaptee.setImplicitChannel(implicitChannel)
}
/**
* Get an implicit channel flag from the `adaptee`.
*/
override fun isImplicitChannel(): Boolean {
return this.adaptee.isImplicitChannel()
}
/**
* Inline function for [IntegrationFlowDefinition.convert] providing a `convert<MyType>()` variant
* with reified generic type.
*/
inline fun <reified T> convert(
crossinline configurer: (GenericEndpointSpec<MessageTransformingHandler>) -> Unit = {}):
KotlinIntegrationFlowDefinition =
convert(T::class.java) { configurer(it) }
/**
* Inline function for [IntegrationFlowDefinition.transform] providing a `transform<MyTypeIn, MyTypeOut>()` variant
* with reified generic type.
*/
inline fun <reified P, T> transform(
crossinline function: (P) -> T,
crossinline configurer: (GenericEndpointSpec<MessageTransformingHandler>) -> Unit = {}):
KotlinIntegrationFlowDefinition =
transform(P::class.java, { function(it) }) { configurer(it) }
/**
* Inline function for [IntegrationFlowDefinition.split] providing a `split<MyTypeIn>()` variant
* with reified generic type.
*/
inline fun <reified P> split(
crossinline function: (P) -> Any,
crossinline configurer: (SplitterEndpointSpec<MethodInvokingSplitter>) -> Unit = {}):
KotlinIntegrationFlowDefinition =
split(P::class.java, { function(it) }) { configurer(it) }
/**
* Inline function for [IntegrationFlowDefinition.filter] providing a `filter<MyTypeIn>()` variant
* with reified generic type.
*/
inline fun <reified P> filter(
crossinline function: (P) -> Boolean,
crossinline configurer: (FilterEndpointSpec) -> Unit = {}): KotlinIntegrationFlowDefinition =
filter(P::class.java, { function(it) }) { configurer(it) }
/**
* Inline function for [IntegrationFlowDefinition.filter] providing a `filter<MyTypeIn>()` variant
* with reified generic type.
*/
inline fun <reified P, T> route(
crossinline function: (P) -> T,
crossinline configurer: (RouterSpec<T, MethodInvokingRouter>) -> Unit = {}):
KotlinIntegrationFlowDefinition =
route(P::class.java, { function(it) }) { configurer(it) }
}

View File

@@ -1,82 +0,0 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.kotlin
import org.springframework.integration.dsl.FilterEndpointSpec
import org.springframework.integration.dsl.GenericEndpointSpec
import org.springframework.integration.dsl.IntegrationFlowDefinition
import org.springframework.integration.dsl.RouterSpec
import org.springframework.integration.dsl.SplitterEndpointSpec
import org.springframework.integration.handler.ServiceActivatingHandler
import org.springframework.integration.router.MethodInvokingRouter
import org.springframework.integration.splitter.MethodInvokingSplitter
import org.springframework.integration.transformer.MessageTransformingHandler
import org.springframework.messaging.MessageHeaders
/**
* Extension for [IntegrationFlowDefinition.convert] providing a `convert<MyType>()` variant.
*
* @author Artem Bilan
*/
inline fun <reified T> IntegrationFlowDefinition<*>.convert(
crossinline configurer: (GenericEndpointSpec<MessageTransformingHandler>) -> Unit = {}):
IntegrationFlowDefinition<*> =
convert(T::class.java) { configurer(it) }
/**
* Extension for [IntegrationFlowDefinition.transform] providing a `transform<MyTypeIn, MyTypeOut>()` variant.
*
* @author Artem Bilan
*/
inline fun <reified P, T> IntegrationFlowDefinition<*>.transformReified(
crossinline function: (P) -> T,
crossinline configurer: (GenericEndpointSpec<MessageTransformingHandler>) -> Unit = {}):
IntegrationFlowDefinition<*> =
transform(P::class.java, { function(it) }) { configurer(it) }
/**
* Extension for [IntegrationFlowDefinition.split] providing a `split<MyTypeIn>()` variant.
*
* @author Artem Bilan
*/
inline fun <reified P> IntegrationFlowDefinition<*>.split(
crossinline function: (P) -> Any,
crossinline configurer: (SplitterEndpointSpec<MethodInvokingSplitter>) -> Unit = {}):
IntegrationFlowDefinition<*> =
split(P::class.java, { function(it) }) { configurer(it) }
/**
* Extension for [IntegrationFlowDefinition.filter] providing a `filter<MyTypeIn>()` variant.
*
* @author Artem Bilan
*/
inline fun <reified P> IntegrationFlowDefinition<*>.filterReified(
crossinline function: (P) -> Boolean,
crossinline configurer: (FilterEndpointSpec) -> Unit = {}):
IntegrationFlowDefinition<*> =
filter(P::class.java, { function(it) }) { configurer(it) }
/**
* Extension for [IntegrationFlowDefinition.filter] providing a `filter<MyTypeIn>()` variant.
*
* @author Artem Bilan
*/
inline fun <reified P, T> IntegrationFlowDefinition<*>.routeReified(
crossinline function: (P) -> T,
crossinline configurer: (RouterSpec<T, MethodInvokingRouter>) -> Unit = {}):
IntegrationFlowDefinition<*> =
route(P::class.java, { function(it) }) { configurer(it) }

View File

@@ -32,12 +32,7 @@ import org.springframework.integration.config.EnableIntegration
import org.springframework.integration.core.MessagingTemplate
import org.springframework.integration.dsl.Pollers
import org.springframework.integration.dsl.context.IntegrationFlowContext
import org.springframework.integration.dsl.kotlin.convert
import org.springframework.integration.dsl.kotlin.filterReified
import org.springframework.integration.dsl.kotlin.integrationFlow
import org.springframework.integration.dsl.kotlin.routeReified
import org.springframework.integration.dsl.kotlin.split
import org.springframework.integration.dsl.kotlin.transformReified
import org.springframework.integration.dsl.integrationFlow
import org.springframework.integration.endpoint.MessageProcessorMessageSource
import org.springframework.integration.handler.LoggingHandler
import org.springframework.integration.scheduling.PollerMetadata
@@ -164,7 +159,7 @@ class KotlinDslTests {
val integrationFlow =
integrationFlow(publisher) {
transformReified<Message<Int>, Int>({ it.payload * 2 }) { it.id("foo") }
transform<Message<Int>, Int>({ it.payload * 2 }) { it.id("foo") }
channel(fluxChannel)
}
@@ -211,15 +206,15 @@ class KotlinDslTests {
@Bean
fun functionFlow() =
integrationFlow<Function<String, String>>({ it.beanName("functionGateway") }) {
transform<String, String> { it.toUpperCase() }
transform<String, String>({ it.toUpperCase() })
split<String>({ p -> p })
}
@Bean
fun functionFlow2() =
integrationFlow<Function<*, *>> {
transform<String, String> { it.toLowerCase() }
routeReified<Message<*>, Any?>({ m -> m.headers.replyChannel }) { it.id("router") }
transform<String, String>({ it.toLowerCase() })
route<Message<*>, Any?>({ m -> m.headers.replyChannel }) { it.id("router") }
}
@Bean
@@ -251,19 +246,19 @@ class KotlinDslTests {
fun flowFromSupplier2() =
integrationFlow({ "testSupplier2" },
{ it.poller { it.trigger(OnlyOnceTrigger()) } }) {
filterReified<Message<Any>>({ m -> m.payload is String })
filter<Message<Any>>({ m -> m.payload is String })
channel { c -> c.queue("testSupplierResult2") }
}
@Bean
fun flowLambda() =
integrationFlow {
filter<String> { it === "test" }
filter<String>({ it === "test" })
wireTap(
integrationFlow {
channel { c -> c.queue("wireTapChannel") }
})
transform<String, String> { it.toUpperCase() }
transform<String, String>({ it.toUpperCase() })
}
}