DSL: JMS in/out adapters, outbound gateway

* Others fixes, improvements and refactoring
This commit is contained in:
Artem Bilan
2014-06-11 19:16:57 +03:00
parent 7394516a2d
commit b42e6e1503
19 changed files with 680 additions and 25 deletions

View File

@@ -30,7 +30,7 @@ ext {
log4jVersion = '1.2.17'
slf4jVersion = '1.7.6'
springIntegrationVersion = '4.0.2.RELEASE'
springBootVersion = '1.1.0.BUILD-SNAPSHOT'
springBootVersion = '1.1.0.RELEASE'
linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions'
linkCi = 'https://build.spring.io/browse/INTEXT'
@@ -81,7 +81,6 @@ dependencies {
testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:$embedMongoVersion"
testCompile "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion"
testRuntime "log4j:log4j:$log4jVersion"
testRuntime "org.slf4j:jcl-over-slf4j:$slf4jVersion"
testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion"
testRuntime("org.apache.activemq:activemq-broker:$activeMqVersion")

View File

@@ -16,7 +16,7 @@
package org.springframework.integration.dsl;
import org.springframework.integration.dsl.core.IntegrationComponentSpec;
import org.springframework.integration.dsl.core.MessageHandlerSpec;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.messaging.MessageChannel;
@@ -24,7 +24,7 @@ import org.springframework.messaging.MessageChannel;
* @author Artem Bilan
*/
public class AbstractRouterSpec<S extends AbstractRouterSpec<S, R>, R extends AbstractMessageRouter>
extends IntegrationComponentSpec<S, R> {
extends MessageHandlerSpec<S, R> {
AbstractRouterSpec(R router) {
this.target = router;

View File

@@ -17,7 +17,6 @@
package org.springframework.integration.dsl;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
@@ -25,7 +24,7 @@ import org.springframework.integration.aggregator.ExpressionEvaluatingReleaseStr
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.config.CorrelationStrategyFactoryBean;
import org.springframework.integration.config.ReleaseStrategyFactoryBean;
import org.springframework.integration.dsl.core.IntegrationComponentSpec;
import org.springframework.integration.dsl.core.MessageHandlerSpec;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.messaging.MessageChannel;
@@ -35,9 +34,7 @@ import org.springframework.scheduling.TaskScheduler;
* @author Artem Bilan
*/
public abstract class CorrelationHandlerSpec<S extends CorrelationHandlerSpec<S, H>, H extends AbstractCorrelatingMessageHandler>
extends IntegrationComponentSpec<S, H> {
protected final static SpelExpressionParser PARSER = new SpelExpressionParser();
extends MessageHandlerSpec<S, H> {
protected MessageGroupStore messageStore;

View File

@@ -21,7 +21,7 @@ import java.util.Map;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.dsl.core.IntegrationComponentSpec;
import org.springframework.integration.dsl.core.MessageHandlerSpec;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.transformer.ContentEnricher;
import org.springframework.integration.transformer.support.AbstractHeaderValueMessageProcessor;
@@ -34,7 +34,7 @@ import org.springframework.util.Assert;
/**
* @author Artem Bilan
*/
public class EnricherSpec extends IntegrationComponentSpec<EnricherSpec, ContentEnricher> {
public class EnricherSpec extends MessageHandlerSpec<EnricherSpec, ContentEnricher> {
private final static SpelExpressionParser PARSER = new SpelExpressionParser();

View File

@@ -31,6 +31,7 @@ import org.springframework.integration.core.GenericSelector;
import org.springframework.integration.core.MessageSelector;
import org.springframework.integration.dsl.channel.MessageChannelSpec;
import org.springframework.integration.dsl.core.ConsumerEndpointSpec;
import org.springframework.integration.dsl.core.MessageHandlerSpec;
import org.springframework.integration.dsl.support.BeanNameMessageProcessor;
import org.springframework.integration.dsl.support.ComponentConfigurer;
import org.springframework.integration.dsl.support.EndpointConfigurer;
@@ -188,6 +189,11 @@ public final class IntegrationFlowBuilder {
return this.register(new FilterEndpointSpec(new MessageFilter(selector)), endpointConfigurer);
}
public <S extends MessageHandlerSpec<S, ? extends MessageHandler>> IntegrationFlowBuilder handle(S messageHandlerSpec) {
Assert.notNull(messageHandlerSpec);
return handle(messageHandlerSpec.get());
}
public IntegrationFlowBuilder handle(MessageHandler messageHandler) {
return this.handle(messageHandler, null);
}
@@ -228,6 +234,12 @@ public final class IntegrationFlowBuilder {
return this.handle(serviceActivatingHandler, endpointConfigurer);
}
public <H extends MessageHandler, S extends MessageHandlerSpec<S, H>>
IntegrationFlowBuilder handle(S messageHandlerSpec, EndpointConfigurer<GenericEndpointSpec<H>> endpointConfigurer) {
Assert.notNull(messageHandlerSpec);
return handle(messageHandlerSpec.get(), endpointConfigurer);
}
public <H extends MessageHandler> IntegrationFlowBuilder handle(H messageHandler,
EndpointConfigurer<GenericEndpointSpec<H>> endpointConfigurer) {
Assert.notNull(messageHandler);

View File

@@ -20,6 +20,7 @@ import org.springframework.beans.DirectFieldAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.channel.MessageChannelSpec;
import org.springframework.integration.dsl.core.MessageSourceSpec;
import org.springframework.integration.dsl.core.MessagingGatewaySpec;
import org.springframework.integration.dsl.core.MessagingProducerSpec;
import org.springframework.integration.dsl.support.EndpointConfigurer;
@@ -63,6 +64,16 @@ public final class IntegrationFlows {
return from(messageChannelSpec.get());
}
public static <S extends MessageSourceSpec<S, ? extends MessageSource<?>>> IntegrationFlowBuilder from(S
messageSourceSpec) {
return from(messageSourceSpec.get());
}
public static <S extends MessageSourceSpec<S, ? extends MessageSource<?>>> IntegrationFlowBuilder from(S messageSource,
EndpointConfigurer<SourcePollingChannelAdapterSpec> endpointConfigurer) {
return from(messageSource.get(), endpointConfigurer);
}
public static IntegrationFlowBuilder from(MessageSource<?> messageSource) {
return from(messageSource, null);
}

View File

@@ -22,7 +22,6 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.inbound.AmqpInboundGateway;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.dsl.core.MessagingGatewaySpec;
import org.springframework.integration.dsl.core.MessagingProducerSpec;
@@ -66,18 +65,15 @@ public abstract class Amqp {
}
public static AmqpOutboundEndpointSpec outboundAdapter(AmqpTemplate amqpTemplate) {
return outboundEndpoint(new AmqpOutboundEndpoint(amqpTemplate), false);
return new AmqpOutboundEndpointSpec(amqpTemplate, false);
}
public static AmqpOutboundEndpointSpec outboundGateway(AmqpTemplate amqpTemplate) {
return outboundEndpoint(new AmqpOutboundEndpoint(amqpTemplate), true);
return new AmqpOutboundEndpointSpec(amqpTemplate, true);
}
private static AmqpOutboundEndpointSpec outboundEndpoint(AmqpOutboundEndpoint endpoint, boolean expectReply) {
return new AmqpOutboundEndpointSpec(endpoint, expectReply);
}
public static <S extends AmqpPollableMessageChannelSpec<S>> AmqpPollableMessageChannelSpec<S>pollableChannel(ConnectionFactory connectionFactory) {
public static <S extends AmqpPollableMessageChannelSpec<S>> AmqpPollableMessageChannelSpec<S>
pollableChannel(ConnectionFactory connectionFactory) {
return pollableChannel(null, connectionFactory);
}

View File

@@ -16,18 +16,19 @@
package org.springframework.integration.dsl.amqp;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.dsl.core.IntegrationComponentSpec;
import org.springframework.integration.dsl.core.MessageHandlerSpec;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
/**
* @author Artem Bilan
*/
public class AmqpOutboundEndpointSpec extends IntegrationComponentSpec<AmqpOutboundEndpointSpec, AmqpOutboundEndpoint> {
public class AmqpOutboundEndpointSpec extends MessageHandlerSpec<AmqpOutboundEndpointSpec, AmqpOutboundEndpoint> {
private final AmqpOutboundEndpoint endpoint;
@@ -35,8 +36,8 @@ public class AmqpOutboundEndpointSpec extends IntegrationComponentSpec<AmqpOutbo
private final DefaultAmqpHeaderMapper headerMapper = new DefaultAmqpHeaderMapper();
AmqpOutboundEndpointSpec(AmqpOutboundEndpoint endpoint, boolean expectReply) {
this.endpoint = endpoint;
AmqpOutboundEndpointSpec(AmqpTemplate amqpTemplate, boolean expectReply) {
this.endpoint = new AmqpOutboundEndpoint(amqpTemplate);
this.expectReply = expectReply;
this.endpoint.setExpectReply(expectReply);
this.endpoint.setHeaderMapper(this.headerMapper);

View File

@@ -67,11 +67,25 @@ public abstract class ConsumerEndpointSpec<S extends ConsumerEndpointSpec<S, H>,
return _this();
}
public S requiresReply(boolean requiresReply) {
H handler = this.target.getT2();
if (handler instanceof AbstractReplyProducingMessageHandler) {
((AbstractReplyProducingMessageHandler) handler).setRequiresReply(requiresReply);
}
else {
logger.warn("'requiresReply' can be applied only for AbstractReplyProducingMessageHandler");
}
return _this();
}
public S sendTimeout(long sendTimeout) {
H handler = this.target.getT2();
if (handler instanceof AbstractReplyProducingMessageHandler) {
((AbstractReplyProducingMessageHandler) handler).setSendTimeout(sendTimeout);
}
else {
logger.warn("'sendTimeout' can be applied only for AbstractReplyProducingMessageHandler");
}
return _this();
}

View File

@@ -16,6 +16,11 @@
package org.springframework.integration.dsl.core;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.expression.spel.standard.SpelExpressionParser;
/**
* The common Builder abstraction.
*
@@ -23,6 +28,10 @@ package org.springframework.integration.dsl.core;
*/
public abstract class IntegrationComponentSpec<S extends IntegrationComponentSpec<S, T>, T> {
protected final static SpelExpressionParser PARSER = new SpelExpressionParser();
protected final Log logger = LogFactory.getLog(getClass());
protected volatile T target;
protected String id;

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.core;
import org.springframework.messaging.MessageHandler;
/**
* @author Artem Bilan
*/
public abstract class MessageHandlerSpec<S extends MessageHandlerSpec<S, H>, H extends MessageHandler>
extends IntegrationComponentSpec<S, H> {
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.core;
import org.springframework.integration.core.MessageSource;
/**
* @author Artem Bilan
*/
public abstract class MessageSourceSpec<S extends MessageSourceSpec<S, H>, H extends MessageSource<?>>
extends IntegrationComponentSpec<S, H> {
}

View File

@@ -18,6 +18,8 @@ package org.springframework.integration.dsl.jms;
import javax.jms.ConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
/**
* @author Artem Bilan
*/
@@ -49,4 +51,25 @@ public abstract class Jms {
ConnectionFactory connectionFactory) {
return new JmsPublishSubscribeMessageChannelSpec(connectionFactory).id(id);
}
public static <S extends JmsOutboundChannelAdapterSpec<S>> JmsOutboundChannelAdapterSpec<S> outboundAdapter(JmsTemplate jmsTemplate) {
return new JmsOutboundChannelAdapterSpec<S>(jmsTemplate);
}
public static JmsOutboundChannelAdapterSpec.JmsOutboundChannelSpecTemplateAware outboundAdapter(ConnectionFactory connectionFactory) {
return new JmsOutboundChannelAdapterSpec.JmsOutboundChannelSpecTemplateAware(connectionFactory);
}
public static <S extends JmsInboundChannelAdapterSpec<S>> JmsInboundChannelAdapterSpec<S> inboundAdapter(JmsTemplate jmsTemplate) {
return new JmsInboundChannelAdapterSpec<S>(jmsTemplate);
}
public static JmsInboundChannelAdapterSpec.JmsInboundChannelSpecTemplateAware inboundAdapter(ConnectionFactory connectionFactory) {
return new JmsInboundChannelAdapterSpec.JmsInboundChannelSpecTemplateAware(connectionFactory);
}
public static JmsOutboundGatewaySpec outboundGateway(ConnectionFactory connectionFactory) {
return new JmsOutboundGatewaySpec(connectionFactory);
}
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.jms;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import org.springframework.integration.dsl.core.MessageSourceSpec;
import org.springframework.integration.dsl.support.ComponentConfigurer;
import org.springframework.integration.jms.JmsDestinationPollingSource;
import org.springframework.integration.jms.JmsHeaderMapper;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.util.Assert;
/**
* @author Artem Bilan
*/
public class JmsInboundChannelAdapterSpec<S extends JmsInboundChannelAdapterSpec<S>>
extends MessageSourceSpec<S, JmsDestinationPollingSource> {
final JmsTemplateSpec jmsTemplateSpec = new JmsTemplateSpec();
JmsInboundChannelAdapterSpec(JmsTemplate jmsTemplate) {
this.target = new JmsDestinationPollingSource(jmsTemplate);
}
private JmsInboundChannelAdapterSpec(ConnectionFactory connectionFactory) {
this.target = new JmsDestinationPollingSource(this.jmsTemplateSpec.connectionFactory(connectionFactory).get());
}
public S messageSelector(String messageSelector) {
this.target.setMessageSelector(messageSelector);
return _this();
}
public S headerMapper(JmsHeaderMapper headerMapper) {
this.target.setHeaderMapper(headerMapper);
return _this();
}
public S destination(Destination destination) {
this.target.setDestination(destination);
return _this();
}
public S destination(String destination) {
this.target.setDestinationName(destination);
return _this();
}
@Override
protected JmsDestinationPollingSource doGet() {
throw new UnsupportedOperationException();
}
public static class JmsInboundChannelSpecTemplateAware extends
JmsInboundChannelAdapterSpec<JmsInboundChannelSpecTemplateAware> {
JmsInboundChannelSpecTemplateAware(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
public JmsInboundChannelSpecTemplateAware configureJmsTemplate(ComponentConfigurer<JmsTemplateSpec> configurer) {
Assert.notNull(configurer);
configurer.configure(this.jmsTemplateSpec);
return _this();
}
}
}

View File

@@ -0,0 +1,90 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.jms;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import org.springframework.integration.dsl.core.MessageHandlerSpec;
import org.springframework.integration.dsl.support.ComponentConfigurer;
import org.springframework.integration.jms.JmsHeaderMapper;
import org.springframework.integration.jms.JmsSendingMessageHandler;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.util.Assert;
/**
* @author Artem Bilan
*/
public class JmsOutboundChannelAdapterSpec<S extends JmsOutboundChannelAdapterSpec<S>>
extends MessageHandlerSpec<S, JmsSendingMessageHandler> {
protected final JmsTemplateSpec jmsTemplateSpec = new JmsTemplateSpec();
JmsOutboundChannelAdapterSpec(JmsTemplate jmsTemplate) {
this.target = new JmsSendingMessageHandler(jmsTemplate);
}
private JmsOutboundChannelAdapterSpec(ConnectionFactory connectionFactory) {
this.target = new JmsSendingMessageHandler(this.jmsTemplateSpec.connectionFactory(connectionFactory).get());
}
public S extractPayload(boolean extractPayload) {
this.target.setExtractPayload(extractPayload);
return _this();
}
public S headerMapper(JmsHeaderMapper headerMapper) {
this.target.setHeaderMapper(headerMapper);
return _this();
}
public S destination(Destination destination) {
this.target.setDestination(destination);
return _this();
}
public S destination(String destination) {
this.target.setDestinationName(destination);
return _this();
}
public S destinationExpression(String destination) {
this.target.setDestinationExpression(PARSER.parseExpression(destination));
return _this();
}
@Override
protected JmsSendingMessageHandler doGet() {
return null;
}
public static class JmsOutboundChannelSpecTemplateAware extends
JmsOutboundChannelAdapterSpec<JmsOutboundChannelSpecTemplateAware> {
JmsOutboundChannelSpecTemplateAware(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
public JmsOutboundChannelSpecTemplateAware configureJmsTemplate(ComponentConfigurer<JmsTemplateSpec> configurer) {
Assert.notNull(configurer);
configurer.configure(this.jmsTemplateSpec);
return _this();
}
}
}

View File

@@ -0,0 +1,226 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.jms;
import java.util.concurrent.Executor;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import org.springframework.integration.dsl.core.IntegrationComponentSpec;
import org.springframework.integration.dsl.core.MessageHandlerSpec;
import org.springframework.integration.dsl.support.ComponentConfigurer;
import org.springframework.integration.jms.JmsHeaderMapper;
import org.springframework.integration.jms.JmsOutboundGateway;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.util.Assert;
/**
* @author Artem Bilan
*/
public class JmsOutboundGatewaySpec extends MessageHandlerSpec<JmsOutboundGatewaySpec, JmsOutboundGateway> {
JmsOutboundGatewaySpec(ConnectionFactory connectionFactory) {
this.target = new JmsOutboundGateway();
this.target.setConnectionFactory(connectionFactory);
}
public JmsOutboundGatewaySpec extractRequestPayload(boolean extractPayload) {
this.target.setExtractRequestPayload(extractPayload);
return _this();
}
public JmsOutboundGatewaySpec extractReplyPayload(boolean extractPayload) {
this.target.setExtractReplyPayload(extractPayload);
return _this();
}
public JmsOutboundGatewaySpec headerMapper(JmsHeaderMapper headerMapper) {
this.target.setHeaderMapper(headerMapper);
return _this();
}
public JmsOutboundGatewaySpec requestDestination(Destination destination) {
this.target.setRequestDestination(destination);
return _this();
}
public JmsOutboundGatewaySpec requestDestination(String destination) {
this.target.setRequestDestinationName(destination);
return _this();
}
public JmsOutboundGatewaySpec requestDestinationExpression(String destination) {
this.target.setRequestDestinationExpression(PARSER.parseExpression(destination));
return _this();
}
public JmsOutboundGatewaySpec replyDestination(Destination destination) {
this.target.setReplyDestination(destination);
return _this();
}
public JmsOutboundGatewaySpec replyDestination(String destination) {
this.target.setReplyDestinationName(destination);
return _this();
}
public JmsOutboundGatewaySpec replyDestinationExpression(String destination) {
this.target.setReplyDestinationExpression(PARSER.parseExpression(destination));
return _this();
}
public JmsOutboundGatewaySpec destinationResolver(DestinationResolver destinationResolver) {
this.target.setDestinationResolver(destinationResolver);
return _this();
}
public JmsOutboundGatewaySpec jmsMessageConverter(MessageConverter messageConverter) {
this.target.setMessageConverter(messageConverter);
return _this();
}
public JmsOutboundGatewaySpec correlationKey(String correlationKey) {
this.target.setCorrelationKey(correlationKey);
return _this();
}
public JmsOutboundGatewaySpec requestPubSubDomain(boolean pubSubDomain) {
this.target.setRequestPubSubDomain(pubSubDomain);
return _this();
}
public JmsOutboundGatewaySpec replyPubSubDomain(boolean pubSubDomain) {
this.target.setReplyPubSubDomain(pubSubDomain);
return _this();
}
public JmsOutboundGatewaySpec deliveryPersistent(boolean deliveryPersistent) {
this.target.setDeliveryPersistent(deliveryPersistent);
return _this();
}
public JmsOutboundGatewaySpec priority(int priority) {
this.target.setPriority(priority);
return _this();
}
public JmsOutboundGatewaySpec timeToLive(long timeToLive) {
this.target.setTimeToLive(timeToLive);
return _this();
}
public JmsOutboundGatewaySpec receiveTimeout(long receiveTimeout) {
this.target.setReceiveTimeout(receiveTimeout);
return _this();
}
public JmsOutboundGatewaySpec explicitQosEnabled(boolean explicitQosEnabled) {
this.target.setExplicitQosEnabled(explicitQosEnabled);
return _this();
}
public JmsOutboundGatewaySpec replyContainer() {
this.target.setReplyContainerProperties(new JmsOutboundGateway.ReplyContainerProperties());
return _this();
}
public JmsOutboundGatewaySpec replyContainer(ComponentConfigurer<ReplyContainerSpec> configurer) {
Assert.notNull(configurer);
ReplyContainerSpec spec = new ReplyContainerSpec();
configurer.configure(spec);
this.target.setReplyContainerProperties(spec.get());
return _this();
}
@Override
protected JmsOutboundGateway doGet() {
throw new UnsupportedOperationException();
}
public class ReplyContainerSpec
extends IntegrationComponentSpec<ReplyContainerSpec, JmsOutboundGateway.ReplyContainerProperties> {
ReplyContainerSpec() {
this.target = new JmsOutboundGateway.ReplyContainerProperties();
}
public ReplyContainerSpec sessionTransacted(Boolean sessionTransacted) {
this.target.setSessionTransacted(sessionTransacted);
return _this();
}
public ReplyContainerSpec sessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
this.target.setSessionAcknowledgeMode(sessionAcknowledgeMode);
return _this();
}
public ReplyContainerSpec receiveTimeout(Long receiveTimeout) {
this.target.setReceiveTimeout(receiveTimeout);
return _this();
}
public ReplyContainerSpec recoveryInterval(Long recoveryInterval) {
this.target.setRecoveryInterval(recoveryInterval);
return _this();
}
public ReplyContainerSpec cacheLevel(Integer cacheLevel) {
this.target.setCacheLevel(cacheLevel);
return _this();
}
public ReplyContainerSpec concurrentConsumers(Integer concurrentConsumers) {
this.target.setConcurrentConsumers(concurrentConsumers);
return _this();
}
public ReplyContainerSpec maxConcurrentConsumers(Integer maxConcurrentConsumers) {
this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
return _this();
}
public ReplyContainerSpec maxMessagesPerTask(Integer maxMessagesPerTask) {
this.target.setMaxMessagesPerTask(maxMessagesPerTask);
return _this();
}
public ReplyContainerSpec idleConsumerLimit(Integer idleConsumerLimit) {
this.target.setIdleConsumerLimit(idleConsumerLimit);
return _this();
}
public ReplyContainerSpec idleTaskExecutionLimit(Integer idleTaskExecutionLimit) {
this.target.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
return _this();
}
public ReplyContainerSpec taskExecutor(Executor taskExecutor) {
this.target.setTaskExecutor(taskExecutor);
return _this();
}
@Override
protected JmsOutboundGateway.ReplyContainerProperties doGet() {
throw new UnsupportedOperationException();
}
}
}

View File

@@ -24,6 +24,7 @@ import org.springframework.integration.dsl.channel.MessageChannelSpec;
import org.springframework.integration.jms.AbstractJmsChannel;
import org.springframework.integration.jms.config.JmsChannelFactoryBean;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
/**
* @author Artem Bilan
@@ -54,6 +55,11 @@ public class JmsPollableMessageChannelSpec<S extends JmsPollableMessageChannelSp
return _this();
}
public S destinationResolver(DestinationResolver destinationResolver) {
this.jmsChannelFactoryBean.setDestinationResolver(destinationResolver);
return _this();
}
public S destination(Destination destination) {
this.jmsChannelFactoryBean.setDestination(destination);
return _this();
@@ -64,7 +70,7 @@ public class JmsPollableMessageChannelSpec<S extends JmsPollableMessageChannelSp
return _this();
}
public S amqpMessageConverter(MessageConverter messageConverter) {
public S jmsMessageConverter(MessageConverter messageConverter) {
this.jmsChannelFactoryBean.setMessageConverter(messageConverter);
return _this();
}

View File

@@ -0,0 +1,100 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.jms;
import javax.jms.ConnectionFactory;
import org.springframework.integration.dsl.core.IntegrationComponentSpec;
import org.springframework.integration.jms.DynamicJmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
/**
* @author Artem Bilan
*/
public class JmsTemplateSpec extends IntegrationComponentSpec<JmsTemplateSpec, DynamicJmsTemplate> {
public JmsTemplateSpec() {
this.target = new DynamicJmsTemplate();
}
JmsTemplateSpec connectionFactory(ConnectionFactory connectionFactory) {
this.target.setConnectionFactory(connectionFactory);
return _this();
}
public JmsTemplateSpec destinationResolver(DestinationResolver destinationResolver) {
this.target.setDestinationResolver(destinationResolver);
return _this();
}
public JmsTemplateSpec pubSubDomain(boolean pubSubDomain) {
this.target.setPubSubDomain(pubSubDomain);
return _this();
}
public JmsTemplateSpec jmsMessageConverter(MessageConverter messageConverter) {
this.target.setMessageConverter(messageConverter);
return _this();
}
public JmsTemplateSpec deliveryPersistent(boolean deliveryPersistent) {
this.target.setDeliveryPersistent(deliveryPersistent);
return _this();
}
public JmsTemplateSpec explicitQosEnabled(boolean explicitQosEnabled) {
this.target.setExplicitQosEnabled(explicitQosEnabled);
return _this();
}
public JmsTemplateSpec priority(int priority) {
this.target.setPriority(priority);
return _this();
}
public JmsTemplateSpec timeToLive(long timeToLive) {
this.target.setTimeToLive(timeToLive);
return _this();
}
public JmsTemplateSpec receiveTimeout(long receiveTimeout) {
this.target.setReceiveTimeout(receiveTimeout);
return _this();
}
/**
* @param sessionAcknowledgeMode the acknowledgement mode constant
* @return the current {@link org.springframework.integration.dsl.channel.MessageChannelSpec}
* @see javax.jms.Session#AUTO_ACKNOWLEDGE etc.
*/
public JmsTemplateSpec sessionAcknowledgeMode(int sessionAcknowledgeMode) {
this.target.setSessionAcknowledgeMode(sessionAcknowledgeMode);
return _this();
}
public JmsTemplateSpec sessionTransacted(boolean sessionTransacted) {
this.target.setSessionTransacted(sessionTransacted);
return _this();
}
@Override
protected DynamicJmsTemplate doGet() {
throw new UnsupportedOperationException();
}
}

View File

@@ -942,6 +942,24 @@ public class IntegrationFlowTests {
assertEquals("HELLO THROUGH THE AMQP", receive.getPayload());
}
@Autowired
@Qualifier("jmsOutboundInboundChannel")
private MessageChannel jmsOutboundInboundChannel;
@Autowired
@Qualifier("jmsOutboundInboundReplyChannel")
private PollableChannel jmsOutboundInboundReplyChannel;
@Test
public void testJmsOutboundInboundFlow() throws Exception {
this.jmsOutboundInboundChannel.send(MessageBuilder.withPayload("hello through the amqp").build());
Message<?> receive = this.jmsOutboundInboundReplyChannel.receive(5000);
assertNotNull(receive);
assertEquals("HELLO THROUGH THE AMQP", receive.getPayload());
}
@MessagingGateway(defaultRequestChannel = "controlBus")
private static interface ControlBusGateway {
@@ -1005,6 +1023,22 @@ public class IntegrationFlowTests {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlows.from("jmsOutboundInboundChannel")
.handle(Jms.outboundAdapter(this.jmsConnectionFactory).destination("jmsOutboundInboundChannel"))
.get();
}
@Bean
public IntegrationFlow jmsInboundFlow() {
return IntegrationFlows.from(Jms.inboundAdapter(this.jmsConnectionFactory).destination("jmsOutboundInboundChannel"))
.<String, String>transform(String::toUpperCase)
.channel(MessageChannels.queue("jmsOutboundInboundReplyChannel"))
.get();
}
}
@Configuration
@@ -1394,7 +1428,7 @@ public class IntegrationFlowTests {
@Bean
public IntegrationFlow amqpOutboundFlow() {
return IntegrationFlows.from(Amqp.channel("amqpOutboundInput", this.rabbitConnectionFactory))
.handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey").get())
.handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey"))
.get();
}