diff --git a/spring-integration-java-dsl/build.gradle b/spring-integration-java-dsl/build.gradle index 92b6a00..41a93ec 100644 --- a/spring-integration-java-dsl/build.gradle +++ b/spring-integration-java-dsl/build.gradle @@ -30,7 +30,7 @@ ext { log4jVersion = '1.2.17' slf4jVersion = '1.7.6' springIntegrationVersion = '4.0.2.RELEASE' - springBootVersion = '1.1.0.BUILD-SNAPSHOT' + springBootVersion = '1.1.0.RELEASE' linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions' linkCi = 'https://build.spring.io/browse/INTEXT' @@ -81,7 +81,6 @@ dependencies { testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:$embedMongoVersion" testCompile "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion" - testRuntime "log4j:log4j:$log4jVersion" testRuntime "org.slf4j:jcl-over-slf4j:$slf4jVersion" testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion" testRuntime("org.apache.activemq:activemq-broker:$activeMqVersion") diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/AbstractRouterSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/AbstractRouterSpec.java index 22ea735..d2711d5 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/AbstractRouterSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/AbstractRouterSpec.java @@ -16,7 +16,7 @@ package org.springframework.integration.dsl; -import org.springframework.integration.dsl.core.IntegrationComponentSpec; +import org.springframework.integration.dsl.core.MessageHandlerSpec; import org.springframework.integration.router.AbstractMessageRouter; import org.springframework.messaging.MessageChannel; @@ -24,7 +24,7 @@ import org.springframework.messaging.MessageChannel; * @author Artem Bilan */ public class AbstractRouterSpec, R extends AbstractMessageRouter> - extends IntegrationComponentSpec { + extends MessageHandlerSpec { AbstractRouterSpec(R router) { this.target = router; diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java index d56e712..00d6d72 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java @@ -17,7 +17,6 @@ package org.springframework.integration.dsl; import org.springframework.expression.Expression; -import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler; import org.springframework.integration.aggregator.CorrelationStrategy; import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy; @@ -25,7 +24,7 @@ import org.springframework.integration.aggregator.ExpressionEvaluatingReleaseStr import org.springframework.integration.aggregator.ReleaseStrategy; import org.springframework.integration.config.CorrelationStrategyFactoryBean; import org.springframework.integration.config.ReleaseStrategyFactoryBean; -import org.springframework.integration.dsl.core.IntegrationComponentSpec; +import org.springframework.integration.dsl.core.MessageHandlerSpec; import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.store.MessageGroupStore; import org.springframework.messaging.MessageChannel; @@ -35,9 +34,7 @@ import org.springframework.scheduling.TaskScheduler; * @author Artem Bilan */ public abstract class CorrelationHandlerSpec, H extends AbstractCorrelatingMessageHandler> - extends IntegrationComponentSpec { - - protected final static SpelExpressionParser PARSER = new SpelExpressionParser(); + extends MessageHandlerSpec { protected MessageGroupStore messageStore; diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/EnricherSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/EnricherSpec.java index 049753a..2b8d8d6 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/EnricherSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/EnricherSpec.java @@ -21,7 +21,7 @@ import java.util.Map; import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.integration.dsl.core.IntegrationComponentSpec; +import org.springframework.integration.dsl.core.MessageHandlerSpec; import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.transformer.ContentEnricher; import org.springframework.integration.transformer.support.AbstractHeaderValueMessageProcessor; @@ -34,7 +34,7 @@ import org.springframework.util.Assert; /** * @author Artem Bilan */ -public class EnricherSpec extends IntegrationComponentSpec { +public class EnricherSpec extends MessageHandlerSpec { private final static SpelExpressionParser PARSER = new SpelExpressionParser(); diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java index d96c631..3509be7 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java @@ -31,6 +31,7 @@ import org.springframework.integration.core.GenericSelector; import org.springframework.integration.core.MessageSelector; import org.springframework.integration.dsl.channel.MessageChannelSpec; import org.springframework.integration.dsl.core.ConsumerEndpointSpec; +import org.springframework.integration.dsl.core.MessageHandlerSpec; import org.springframework.integration.dsl.support.BeanNameMessageProcessor; import org.springframework.integration.dsl.support.ComponentConfigurer; import org.springframework.integration.dsl.support.EndpointConfigurer; @@ -188,6 +189,11 @@ public final class IntegrationFlowBuilder { return this.register(new FilterEndpointSpec(new MessageFilter(selector)), endpointConfigurer); } + public > IntegrationFlowBuilder handle(S messageHandlerSpec) { + Assert.notNull(messageHandlerSpec); + return handle(messageHandlerSpec.get()); + } + public IntegrationFlowBuilder handle(MessageHandler messageHandler) { return this.handle(messageHandler, null); } @@ -228,6 +234,12 @@ public final class IntegrationFlowBuilder { return this.handle(serviceActivatingHandler, endpointConfigurer); } + public > + IntegrationFlowBuilder handle(S messageHandlerSpec, EndpointConfigurer> endpointConfigurer) { + Assert.notNull(messageHandlerSpec); + return handle(messageHandlerSpec.get(), endpointConfigurer); + } + public IntegrationFlowBuilder handle(H messageHandler, EndpointConfigurer> endpointConfigurer) { Assert.notNull(messageHandler); diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java index f7b7a7e..3aedc0f 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java @@ -20,6 +20,7 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.channel.MessageChannelSpec; +import org.springframework.integration.dsl.core.MessageSourceSpec; import org.springframework.integration.dsl.core.MessagingGatewaySpec; import org.springframework.integration.dsl.core.MessagingProducerSpec; import org.springframework.integration.dsl.support.EndpointConfigurer; @@ -63,6 +64,16 @@ public final class IntegrationFlows { return from(messageChannelSpec.get()); } + public static >> IntegrationFlowBuilder from(S + messageSourceSpec) { + return from(messageSourceSpec.get()); + } + + public static >> IntegrationFlowBuilder from(S messageSource, + EndpointConfigurer endpointConfigurer) { + return from(messageSource.get(), endpointConfigurer); + } + public static IntegrationFlowBuilder from(MessageSource messageSource) { return from(messageSource, null); } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/Amqp.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/Amqp.java index 231d584..c2a1c3b 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/Amqp.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/Amqp.java @@ -22,7 +22,6 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; import org.springframework.integration.amqp.inbound.AmqpInboundGateway; -import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint; import org.springframework.integration.dsl.core.MessagingGatewaySpec; import org.springframework.integration.dsl.core.MessagingProducerSpec; @@ -66,18 +65,15 @@ public abstract class Amqp { } public static AmqpOutboundEndpointSpec outboundAdapter(AmqpTemplate amqpTemplate) { - return outboundEndpoint(new AmqpOutboundEndpoint(amqpTemplate), false); + return new AmqpOutboundEndpointSpec(amqpTemplate, false); } public static AmqpOutboundEndpointSpec outboundGateway(AmqpTemplate amqpTemplate) { - return outboundEndpoint(new AmqpOutboundEndpoint(amqpTemplate), true); + return new AmqpOutboundEndpointSpec(amqpTemplate, true); } - private static AmqpOutboundEndpointSpec outboundEndpoint(AmqpOutboundEndpoint endpoint, boolean expectReply) { - return new AmqpOutboundEndpointSpec(endpoint, expectReply); - } - - public static > AmqpPollableMessageChannelSpecpollableChannel(ConnectionFactory connectionFactory) { + public static > AmqpPollableMessageChannelSpec + pollableChannel(ConnectionFactory connectionFactory) { return pollableChannel(null, connectionFactory); } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/AmqpOutboundEndpointSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/AmqpOutboundEndpointSpec.java index c244d74..f1844b3 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/AmqpOutboundEndpointSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/AmqpOutboundEndpointSpec.java @@ -16,18 +16,19 @@ package org.springframework.integration.dsl.amqp; +import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; -import org.springframework.integration.dsl.core.IntegrationComponentSpec; +import org.springframework.integration.dsl.core.MessageHandlerSpec; import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** * @author Artem Bilan */ -public class AmqpOutboundEndpointSpec extends IntegrationComponentSpec { +public class AmqpOutboundEndpointSpec extends MessageHandlerSpec { private final AmqpOutboundEndpoint endpoint; @@ -35,8 +36,8 @@ public class AmqpOutboundEndpointSpec extends IntegrationComponentSpec, return _this(); } + public S requiresReply(boolean requiresReply) { + H handler = this.target.getT2(); + if (handler instanceof AbstractReplyProducingMessageHandler) { + ((AbstractReplyProducingMessageHandler) handler).setRequiresReply(requiresReply); + } + else { + logger.warn("'requiresReply' can be applied only for AbstractReplyProducingMessageHandler"); + } + return _this(); + } + public S sendTimeout(long sendTimeout) { H handler = this.target.getT2(); if (handler instanceof AbstractReplyProducingMessageHandler) { ((AbstractReplyProducingMessageHandler) handler).setSendTimeout(sendTimeout); } + else { + logger.warn("'sendTimeout' can be applied only for AbstractReplyProducingMessageHandler"); + } return _this(); } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/IntegrationComponentSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/IntegrationComponentSpec.java index 936a442..0b8f477 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/IntegrationComponentSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/IntegrationComponentSpec.java @@ -16,6 +16,11 @@ package org.springframework.integration.dsl.core; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.expression.spel.standard.SpelExpressionParser; + /** * The common Builder abstraction. * @@ -23,6 +28,10 @@ package org.springframework.integration.dsl.core; */ public abstract class IntegrationComponentSpec, T> { + protected final static SpelExpressionParser PARSER = new SpelExpressionParser(); + + protected final Log logger = LogFactory.getLog(getClass()); + protected volatile T target; protected String id; diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessageHandlerSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessageHandlerSpec.java new file mode 100644 index 0000000..59a0c99 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessageHandlerSpec.java @@ -0,0 +1,26 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.core; + +import org.springframework.messaging.MessageHandler; + +/** + * @author Artem Bilan + */ +public abstract class MessageHandlerSpec, H extends MessageHandler> + extends IntegrationComponentSpec { +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessageSourceSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessageSourceSpec.java new file mode 100644 index 0000000..f8d7841 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessageSourceSpec.java @@ -0,0 +1,26 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.core; + +import org.springframework.integration.core.MessageSource; + +/** + * @author Artem Bilan + */ +public abstract class MessageSourceSpec, H extends MessageSource> + extends IntegrationComponentSpec { +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/Jms.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/Jms.java index efd9e7f..3db3179 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/Jms.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/Jms.java @@ -18,6 +18,8 @@ package org.springframework.integration.dsl.jms; import javax.jms.ConnectionFactory; +import org.springframework.jms.core.JmsTemplate; + /** * @author Artem Bilan */ @@ -49,4 +51,25 @@ public abstract class Jms { ConnectionFactory connectionFactory) { return new JmsPublishSubscribeMessageChannelSpec(connectionFactory).id(id); } + + public static > JmsOutboundChannelAdapterSpec outboundAdapter(JmsTemplate jmsTemplate) { + return new JmsOutboundChannelAdapterSpec(jmsTemplate); + } + + public static JmsOutboundChannelAdapterSpec.JmsOutboundChannelSpecTemplateAware outboundAdapter(ConnectionFactory connectionFactory) { + return new JmsOutboundChannelAdapterSpec.JmsOutboundChannelSpecTemplateAware(connectionFactory); + } + + public static > JmsInboundChannelAdapterSpec inboundAdapter(JmsTemplate jmsTemplate) { + return new JmsInboundChannelAdapterSpec(jmsTemplate); + } + + public static JmsInboundChannelAdapterSpec.JmsInboundChannelSpecTemplateAware inboundAdapter(ConnectionFactory connectionFactory) { + return new JmsInboundChannelAdapterSpec.JmsInboundChannelSpecTemplateAware(connectionFactory); + } + + public static JmsOutboundGatewaySpec outboundGateway(ConnectionFactory connectionFactory) { + return new JmsOutboundGatewaySpec(connectionFactory); + } + } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsInboundChannelAdapterSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsInboundChannelAdapterSpec.java new file mode 100644 index 0000000..f22a62c --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsInboundChannelAdapterSpec.java @@ -0,0 +1,85 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.jms; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; + +import org.springframework.integration.dsl.core.MessageSourceSpec; +import org.springframework.integration.dsl.support.ComponentConfigurer; +import org.springframework.integration.jms.JmsDestinationPollingSource; +import org.springframework.integration.jms.JmsHeaderMapper; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.util.Assert; + +/** + * @author Artem Bilan + */ +public class JmsInboundChannelAdapterSpec> + extends MessageSourceSpec { + + final JmsTemplateSpec jmsTemplateSpec = new JmsTemplateSpec(); + + JmsInboundChannelAdapterSpec(JmsTemplate jmsTemplate) { + this.target = new JmsDestinationPollingSource(jmsTemplate); + } + + private JmsInboundChannelAdapterSpec(ConnectionFactory connectionFactory) { + this.target = new JmsDestinationPollingSource(this.jmsTemplateSpec.connectionFactory(connectionFactory).get()); + } + + public S messageSelector(String messageSelector) { + this.target.setMessageSelector(messageSelector); + return _this(); + } + + public S headerMapper(JmsHeaderMapper headerMapper) { + this.target.setHeaderMapper(headerMapper); + return _this(); + } + + public S destination(Destination destination) { + this.target.setDestination(destination); + return _this(); + } + + public S destination(String destination) { + this.target.setDestinationName(destination); + return _this(); + } + + @Override + protected JmsDestinationPollingSource doGet() { + throw new UnsupportedOperationException(); + } + + public static class JmsInboundChannelSpecTemplateAware extends + JmsInboundChannelAdapterSpec { + + JmsInboundChannelSpecTemplateAware(ConnectionFactory connectionFactory) { + super(connectionFactory); + } + + public JmsInboundChannelSpecTemplateAware configureJmsTemplate(ComponentConfigurer configurer) { + Assert.notNull(configurer); + configurer.configure(this.jmsTemplateSpec); + return _this(); + } + + } + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsOutboundChannelAdapterSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsOutboundChannelAdapterSpec.java new file mode 100644 index 0000000..d65f150 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsOutboundChannelAdapterSpec.java @@ -0,0 +1,90 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.jms; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; + +import org.springframework.integration.dsl.core.MessageHandlerSpec; +import org.springframework.integration.dsl.support.ComponentConfigurer; +import org.springframework.integration.jms.JmsHeaderMapper; +import org.springframework.integration.jms.JmsSendingMessageHandler; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.util.Assert; + +/** + * @author Artem Bilan + */ +public class JmsOutboundChannelAdapterSpec> + extends MessageHandlerSpec { + + protected final JmsTemplateSpec jmsTemplateSpec = new JmsTemplateSpec(); + + JmsOutboundChannelAdapterSpec(JmsTemplate jmsTemplate) { + this.target = new JmsSendingMessageHandler(jmsTemplate); + } + + private JmsOutboundChannelAdapterSpec(ConnectionFactory connectionFactory) { + this.target = new JmsSendingMessageHandler(this.jmsTemplateSpec.connectionFactory(connectionFactory).get()); + } + + public S extractPayload(boolean extractPayload) { + this.target.setExtractPayload(extractPayload); + return _this(); + } + + public S headerMapper(JmsHeaderMapper headerMapper) { + this.target.setHeaderMapper(headerMapper); + return _this(); + } + + public S destination(Destination destination) { + this.target.setDestination(destination); + return _this(); + } + + public S destination(String destination) { + this.target.setDestinationName(destination); + return _this(); + } + + public S destinationExpression(String destination) { + this.target.setDestinationExpression(PARSER.parseExpression(destination)); + return _this(); + } + + @Override + protected JmsSendingMessageHandler doGet() { + return null; + } + + public static class JmsOutboundChannelSpecTemplateAware extends + JmsOutboundChannelAdapterSpec { + + JmsOutboundChannelSpecTemplateAware(ConnectionFactory connectionFactory) { + super(connectionFactory); + } + + public JmsOutboundChannelSpecTemplateAware configureJmsTemplate(ComponentConfigurer configurer) { + Assert.notNull(configurer); + configurer.configure(this.jmsTemplateSpec); + return _this(); + } + + } + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsOutboundGatewaySpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsOutboundGatewaySpec.java new file mode 100644 index 0000000..f0153e6 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsOutboundGatewaySpec.java @@ -0,0 +1,226 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.jms; + +import java.util.concurrent.Executor; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; + +import org.springframework.integration.dsl.core.IntegrationComponentSpec; +import org.springframework.integration.dsl.core.MessageHandlerSpec; +import org.springframework.integration.dsl.support.ComponentConfigurer; +import org.springframework.integration.jms.JmsHeaderMapper; +import org.springframework.integration.jms.JmsOutboundGateway; +import org.springframework.jms.support.converter.MessageConverter; +import org.springframework.jms.support.destination.DestinationResolver; +import org.springframework.util.Assert; + +/** + * @author Artem Bilan + */ +public class JmsOutboundGatewaySpec extends MessageHandlerSpec { + + JmsOutboundGatewaySpec(ConnectionFactory connectionFactory) { + this.target = new JmsOutboundGateway(); + this.target.setConnectionFactory(connectionFactory); + } + + public JmsOutboundGatewaySpec extractRequestPayload(boolean extractPayload) { + this.target.setExtractRequestPayload(extractPayload); + return _this(); + } + + public JmsOutboundGatewaySpec extractReplyPayload(boolean extractPayload) { + this.target.setExtractReplyPayload(extractPayload); + return _this(); + } + + public JmsOutboundGatewaySpec headerMapper(JmsHeaderMapper headerMapper) { + this.target.setHeaderMapper(headerMapper); + return _this(); + } + + public JmsOutboundGatewaySpec requestDestination(Destination destination) { + this.target.setRequestDestination(destination); + return _this(); + } + + public JmsOutboundGatewaySpec requestDestination(String destination) { + this.target.setRequestDestinationName(destination); + return _this(); + } + + public JmsOutboundGatewaySpec requestDestinationExpression(String destination) { + this.target.setRequestDestinationExpression(PARSER.parseExpression(destination)); + return _this(); + } + + public JmsOutboundGatewaySpec replyDestination(Destination destination) { + this.target.setReplyDestination(destination); + return _this(); + } + + public JmsOutboundGatewaySpec replyDestination(String destination) { + this.target.setReplyDestinationName(destination); + return _this(); + } + + public JmsOutboundGatewaySpec replyDestinationExpression(String destination) { + this.target.setReplyDestinationExpression(PARSER.parseExpression(destination)); + return _this(); + } + + public JmsOutboundGatewaySpec destinationResolver(DestinationResolver destinationResolver) { + this.target.setDestinationResolver(destinationResolver); + return _this(); + } + + public JmsOutboundGatewaySpec jmsMessageConverter(MessageConverter messageConverter) { + this.target.setMessageConverter(messageConverter); + return _this(); + } + + public JmsOutboundGatewaySpec correlationKey(String correlationKey) { + this.target.setCorrelationKey(correlationKey); + return _this(); + } + + public JmsOutboundGatewaySpec requestPubSubDomain(boolean pubSubDomain) { + this.target.setRequestPubSubDomain(pubSubDomain); + return _this(); + } + + public JmsOutboundGatewaySpec replyPubSubDomain(boolean pubSubDomain) { + this.target.setReplyPubSubDomain(pubSubDomain); + return _this(); + } + + public JmsOutboundGatewaySpec deliveryPersistent(boolean deliveryPersistent) { + this.target.setDeliveryPersistent(deliveryPersistent); + return _this(); + } + + public JmsOutboundGatewaySpec priority(int priority) { + this.target.setPriority(priority); + return _this(); + } + + public JmsOutboundGatewaySpec timeToLive(long timeToLive) { + this.target.setTimeToLive(timeToLive); + return _this(); + } + + public JmsOutboundGatewaySpec receiveTimeout(long receiveTimeout) { + this.target.setReceiveTimeout(receiveTimeout); + return _this(); + } + + public JmsOutboundGatewaySpec explicitQosEnabled(boolean explicitQosEnabled) { + this.target.setExplicitQosEnabled(explicitQosEnabled); + return _this(); + } + + public JmsOutboundGatewaySpec replyContainer() { + this.target.setReplyContainerProperties(new JmsOutboundGateway.ReplyContainerProperties()); + return _this(); + } + + public JmsOutboundGatewaySpec replyContainer(ComponentConfigurer configurer) { + Assert.notNull(configurer); + ReplyContainerSpec spec = new ReplyContainerSpec(); + configurer.configure(spec); + this.target.setReplyContainerProperties(spec.get()); + return _this(); + } + + @Override + protected JmsOutboundGateway doGet() { + throw new UnsupportedOperationException(); + } + + + public class ReplyContainerSpec + extends IntegrationComponentSpec { + + ReplyContainerSpec() { + this.target = new JmsOutboundGateway.ReplyContainerProperties(); + } + + public ReplyContainerSpec sessionTransacted(Boolean sessionTransacted) { + this.target.setSessionTransacted(sessionTransacted); + return _this(); + } + + public ReplyContainerSpec sessionAcknowledgeMode(Integer sessionAcknowledgeMode) { + this.target.setSessionAcknowledgeMode(sessionAcknowledgeMode); + return _this(); + } + + public ReplyContainerSpec receiveTimeout(Long receiveTimeout) { + this.target.setReceiveTimeout(receiveTimeout); + return _this(); + } + + public ReplyContainerSpec recoveryInterval(Long recoveryInterval) { + this.target.setRecoveryInterval(recoveryInterval); + return _this(); + } + + public ReplyContainerSpec cacheLevel(Integer cacheLevel) { + this.target.setCacheLevel(cacheLevel); + return _this(); + } + + public ReplyContainerSpec concurrentConsumers(Integer concurrentConsumers) { + this.target.setConcurrentConsumers(concurrentConsumers); + return _this(); + } + + public ReplyContainerSpec maxConcurrentConsumers(Integer maxConcurrentConsumers) { + this.target.setMaxConcurrentConsumers(maxConcurrentConsumers); + return _this(); + } + + public ReplyContainerSpec maxMessagesPerTask(Integer maxMessagesPerTask) { + this.target.setMaxMessagesPerTask(maxMessagesPerTask); + return _this(); + } + + public ReplyContainerSpec idleConsumerLimit(Integer idleConsumerLimit) { + this.target.setIdleConsumerLimit(idleConsumerLimit); + return _this(); + } + + public ReplyContainerSpec idleTaskExecutionLimit(Integer idleTaskExecutionLimit) { + this.target.setIdleTaskExecutionLimit(idleTaskExecutionLimit); + return _this(); + } + + public ReplyContainerSpec taskExecutor(Executor taskExecutor) { + this.target.setTaskExecutor(taskExecutor); + return _this(); + } + + @Override + protected JmsOutboundGateway.ReplyContainerProperties doGet() { + throw new UnsupportedOperationException(); + } + + } + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsPollableMessageChannelSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsPollableMessageChannelSpec.java index c2e0c09..c8284d9 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsPollableMessageChannelSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsPollableMessageChannelSpec.java @@ -24,6 +24,7 @@ import org.springframework.integration.dsl.channel.MessageChannelSpec; import org.springframework.integration.jms.AbstractJmsChannel; import org.springframework.integration.jms.config.JmsChannelFactoryBean; import org.springframework.jms.support.converter.MessageConverter; +import org.springframework.jms.support.destination.DestinationResolver; /** * @author Artem Bilan @@ -54,6 +55,11 @@ public class JmsPollableMessageChannelSpec { + + public JmsTemplateSpec() { + this.target = new DynamicJmsTemplate(); + } + + JmsTemplateSpec connectionFactory(ConnectionFactory connectionFactory) { + this.target.setConnectionFactory(connectionFactory); + return _this(); + } + + public JmsTemplateSpec destinationResolver(DestinationResolver destinationResolver) { + this.target.setDestinationResolver(destinationResolver); + return _this(); + } + + public JmsTemplateSpec pubSubDomain(boolean pubSubDomain) { + this.target.setPubSubDomain(pubSubDomain); + return _this(); + } + + public JmsTemplateSpec jmsMessageConverter(MessageConverter messageConverter) { + this.target.setMessageConverter(messageConverter); + return _this(); + } + + public JmsTemplateSpec deliveryPersistent(boolean deliveryPersistent) { + this.target.setDeliveryPersistent(deliveryPersistent); + return _this(); + } + + public JmsTemplateSpec explicitQosEnabled(boolean explicitQosEnabled) { + this.target.setExplicitQosEnabled(explicitQosEnabled); + return _this(); + } + + public JmsTemplateSpec priority(int priority) { + this.target.setPriority(priority); + return _this(); + } + + public JmsTemplateSpec timeToLive(long timeToLive) { + this.target.setTimeToLive(timeToLive); + return _this(); + } + + public JmsTemplateSpec receiveTimeout(long receiveTimeout) { + this.target.setReceiveTimeout(receiveTimeout); + return _this(); + } + + /** + * @param sessionAcknowledgeMode the acknowledgement mode constant + * @return the current {@link org.springframework.integration.dsl.channel.MessageChannelSpec} + * @see javax.jms.Session#AUTO_ACKNOWLEDGE etc. + */ + public JmsTemplateSpec sessionAcknowledgeMode(int sessionAcknowledgeMode) { + this.target.setSessionAcknowledgeMode(sessionAcknowledgeMode); + return _this(); + } + + public JmsTemplateSpec sessionTransacted(boolean sessionTransacted) { + this.target.setSessionTransacted(sessionTransacted); + return _this(); + } + + @Override + protected DynamicJmsTemplate doGet() { + throw new UnsupportedOperationException(); + } + +} diff --git a/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java b/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java index 8532eed..7075234 100644 --- a/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java +++ b/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java @@ -942,6 +942,24 @@ public class IntegrationFlowTests { assertEquals("HELLO THROUGH THE AMQP", receive.getPayload()); } + @Autowired + @Qualifier("jmsOutboundInboundChannel") + private MessageChannel jmsOutboundInboundChannel; + + @Autowired + @Qualifier("jmsOutboundInboundReplyChannel") + private PollableChannel jmsOutboundInboundReplyChannel; + + @Test + public void testJmsOutboundInboundFlow() throws Exception { + this.jmsOutboundInboundChannel.send(MessageBuilder.withPayload("hello through the amqp").build()); + + Message receive = this.jmsOutboundInboundReplyChannel.receive(5000); + + assertNotNull(receive); + assertEquals("HELLO THROUGH THE AMQP", receive.getPayload()); + } + @MessagingGateway(defaultRequestChannel = "controlBus") private static interface ControlBusGateway { @@ -1005,6 +1023,22 @@ public class IntegrationFlowTests { return MessageChannels.publishSubscribe().get(); } + + @Bean + public IntegrationFlow jmsOutboundFlow() { + return IntegrationFlows.from("jmsOutboundInboundChannel") + .handle(Jms.outboundAdapter(this.jmsConnectionFactory).destination("jmsOutboundInboundChannel")) + .get(); + } + + @Bean + public IntegrationFlow jmsInboundFlow() { + return IntegrationFlows.from(Jms.inboundAdapter(this.jmsConnectionFactory).destination("jmsOutboundInboundChannel")) + .transform(String::toUpperCase) + .channel(MessageChannels.queue("jmsOutboundInboundReplyChannel")) + .get(); + } + } @Configuration @@ -1394,7 +1428,7 @@ public class IntegrationFlowTests { @Bean public IntegrationFlow amqpOutboundFlow() { return IntegrationFlows.from(Amqp.channel("amqpOutboundInput", this.rabbitConnectionFactory)) - .handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey").get()) + .handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey")) .get(); }