diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/jmsGatewayWithContainerSettings.xml b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/jmsGatewayWithContainerSettings.xml index f19a7e4280..44da4e1999 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/jmsGatewayWithContainerSettings.xml +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/jmsGatewayWithContainerSettings.xml @@ -7,9 +7,11 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> - + - + + + - + - + + + - + + + diff --git a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/cafe/cafeDemo.xml b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/cafe/cafeDemo.xml index 97c93f15fd..1956eec556 100644 --- a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/cafe/cafeDemo.xml +++ b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/cafe/cafeDemo.xml @@ -17,8 +17,15 @@ - - + + + + + + + + + diff --git a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/helloworld/helloWorldDemo.xml b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/helloworld/helloWorldDemo.xml index b06e7200e2..865eb6f9ff 100644 --- a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/helloworld/helloWorldDemo.xml +++ b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/helloworld/helloWorldDemo.xml @@ -10,7 +10,9 @@ - + + + - + - + @@ -34,11 +34,11 @@ - + + class="org.springframework.security.vote.RoleVoter"/> @@ -48,9 +48,9 @@ + authorities="ROLE_USER, ROLE_ADMIN"/> + authorities="ROLE_USER"/> \ No newline at end of file diff --git a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml index 7ef3e48122..c2d17cf46c 100644 --- a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml +++ b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml @@ -15,18 +15,24 @@ - - + + + + + + - + + + - + - + + + + + diff --git a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests.java b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests.java index b252f5a77d..01f94bdbdf 100644 --- a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests.java +++ b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests.java @@ -80,7 +80,6 @@ public class XPathRouterParserTests extends AbstractJUnit4SpringContextTests{ inputOne.send(docMessage); GenericMessage received = (GenericMessage) errorChannel.receive(1000); assertNotNull("Did not receive message on errors", received); - } @SuppressWarnings("unchecked") diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java index 2d648960e6..cb2aac3a3b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java @@ -32,15 +32,18 @@ import org.springframework.integration.message.MessagePriority; */ public class PriorityChannel extends QueueChannel { + private static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE; + + private final Semaphore semaphore; /** * Create a channel with the specified queue capacity. - * Priority will be based upon the provided {@link Comparator}. + * Priority will be determined by the provided {@link Comparator}. */ public PriorityChannel(int capacity, Comparator> comparator) { - super(new PriorityBlockingQueue>(capacity, comparator)); + super(new PriorityBlockingQueue>(11, comparator)); this.semaphore = new Semaphore(capacity, true); } @@ -53,11 +56,19 @@ public class PriorityChannel extends QueueChannel { } /** - * Create a channel with the default queue capacity and dispatcher policy. + * Create a channel with the default queue capacity of {@link Integer#MAX_VALUE}. + * Priority will be determined by the provided {@link Comparator}. + */ + public PriorityChannel(Comparator> comparator) { + this(DEFAULT_MAX_CAPACITY, comparator); + } + + /** + * Create a channel with the default queue capacity of {@link Integer#MAX_VALUE}. * Priority will be based on the value of {@link MessageHeader#getPriority()}. */ public PriorityChannel() { - this(DEFAULT_CAPACITY, new MessagePriorityComparator()); + this(DEFAULT_MAX_CAPACITY, new MessagePriorityComparator()); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java index 222b7126b0..d649863f64 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java @@ -37,9 +37,6 @@ import org.springframework.util.Assert; */ public class QueueChannel extends AbstractPollableChannel { - public static final int DEFAULT_CAPACITY = 100; - - private final BlockingQueue> queue; @@ -61,11 +58,12 @@ public class QueueChannel extends AbstractPollableChannel { } /** - * Create a channel with the default queue capacity. - * @see #DEFAULT_CAPACITY + * Create a channel with "unbounded" queue capacity. The actual capacity value is + * {@link Integer#MAX_VALUE}. Note that a bounded queue is recommended, since an + * unbounded queue may lead to OutOfMemoryErrors. */ public QueueChannel() { - this(DEFAULT_CAPACITY); + this(new LinkedBlockingQueue>()); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractChannelParser.java index 23bf6d988c..40ff8a8705 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractChannelParser.java @@ -17,44 +17,36 @@ package org.springframework.integration.config; import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.parsing.BeanComponentDefinition; +import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.support.RootBeanDefinition; -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; +import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.channel.interceptor.MessageSelectingInterceptor; import org.springframework.integration.message.selector.PayloadTypeSelector; import org.springframework.util.StringUtils; +import org.springframework.util.xml.DomUtils; /** * Base class for channel parsers. * * @author Mark Fisher */ -public abstract class AbstractChannelParser extends AbstractSingleBeanDefinitionParser { - - @Override - protected abstract Class getBeanClass(Element element); - - protected void postProcess(BeanDefinitionBuilder builder, Element element) { - } +public abstract class AbstractChannelParser extends AbstractBeanDefinitionParser { @Override @SuppressWarnings("unchecked") - protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + protected AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = this.buildBeanDefinition(element, parserContext); ManagedList interceptors = null; - NodeList childNodes = element.getChildNodes(); - for (int i = 0; i < childNodes.getLength(); i++) { - Node child = childNodes.item(i); - if (child.getNodeType() == Node.ELEMENT_NODE && child.getLocalName().equals("interceptors")) { - ChannelInterceptorParser interceptorParser = new ChannelInterceptorParser(); - interceptors = interceptorParser.parseInterceptors((Element) child, parserContext); - } + Element interceptorsElement = DomUtils.getChildElementByTagName(element, "interceptors"); + if (interceptorsElement != null) { + ChannelInterceptorParser interceptorParser = new ChannelInterceptorParser(); + interceptors = interceptorParser.parseInterceptors(interceptorsElement, parserContext); } if (interceptors == null) { interceptors = new ManagedList(); @@ -75,7 +67,16 @@ public abstract class AbstractChannelParser extends AbstractSingleBeanDefinition interceptors.add(new RuntimeBeanReference(interceptorBeanName)); } builder.addPropertyValue("interceptors", interceptors); - this.postProcess(builder, element); + return builder.getBeanDefinition(); } + /** + * Subclasses must implement this method to create the bean definition. + * The class must be defined, and any implementation-specific constructor + * arguments or properties should be configured. This base class will + * configure the interceptors including the 'datatype' interceptor if + * the 'datatype' attribute is defined on the channel element. + */ + protected abstract BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext); + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java deleted file mode 100644 index 1109c8d28c..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.config; - -import org.w3c.dom.Element; - -import org.springframework.integration.channel.DirectChannel; - -/** - * Parser for the <direct-channel> element. - * - * @author Mark Fisher - */ -public class DirectChannelParser extends AbstractChannelParser { - - @Override - protected Class getBeanClass(Element element) { - return DirectChannel.class; - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceHandler.java index 7dda60e48a..a6f60a2bd6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceHandler.java @@ -49,13 +49,13 @@ public class IntegrationNamespaceHandler extends NamespaceHandlerSupport { public void init() { registerBeanDefinitionParser("message-bus", new MessageBusParser()); registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenParser()); - registerBeanDefinitionParser("channel", new QueueChannelParser()); - registerBeanDefinitionParser("queue-channel", new QueueChannelParser()); - registerBeanDefinitionParser("publish-subscribe-channel", new PublishSubscribeChannelParser()); - registerBeanDefinitionParser("direct-channel", new DirectChannelParser()); - registerBeanDefinitionParser("priority-channel", new PriorityChannelParser()); - registerBeanDefinitionParser("rendezvous-channel", new RendezvousChannelParser()); + registerBeanDefinitionParser("channel", new PointToPointChannelParser()); + registerBeanDefinitionParser("queue-channel", new PointToPointChannelParser()); + registerBeanDefinitionParser("direct-channel", new PointToPointChannelParser()); + registerBeanDefinitionParser("priority-channel", new PointToPointChannelParser()); + registerBeanDefinitionParser("rendezvous-channel", new PointToPointChannelParser()); registerBeanDefinitionParser("thread-local-channel", new ThreadLocalChannelParser()); + registerBeanDefinitionParser("publish-subscribe-channel", new PublishSubscribeChannelParser()); registerBeanDefinitionParser("service-activator", new ServiceActivatorParser()); registerBeanDefinitionParser("channel-adapter", new ChannelAdapterParser()); registerBeanDefinitionParser("gateway", new GatewayParser()); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/PointToPointChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/PointToPointChannelParser.java new file mode 100644 index 0000000000..45ec0f3dda --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/PointToPointChannelParser.java @@ -0,0 +1,74 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.PriorityChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.channel.RendezvousChannel; +import org.springframework.util.StringUtils; +import org.springframework.util.xml.DomUtils; + +/** + * Parser for the <channel> element. + * + * @author Mark Fisher + */ +public class PointToPointChannelParser extends AbstractChannelParser { + + private static final String CAPACITY_ATTRIBUTE = "capacity"; + + + @Override + protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = null; + Element queueElement = null; + if ((queueElement = DomUtils.getChildElementByTagName(element, "queue")) != null) { + builder = BeanDefinitionBuilder.genericBeanDefinition(QueueChannel.class); + this.parseQueueCapacity(builder, queueElement); + } + else if ((queueElement = DomUtils.getChildElementByTagName(element, "priority-queue")) != null) { + builder = BeanDefinitionBuilder.genericBeanDefinition(PriorityChannel.class); + this.parseQueueCapacity(builder, queueElement); + String comparatorRef = queueElement.getAttribute("comparator"); + if (StringUtils.hasText(comparatorRef)) { + builder.addConstructorArgReference(comparatorRef); + } + } + else if ((queueElement = DomUtils.getChildElementByTagName(element, "rendezvous-queue")) != null) { + builder = BeanDefinitionBuilder.genericBeanDefinition(RendezvousChannel.class); + } + else { + builder = BeanDefinitionBuilder.genericBeanDefinition(DirectChannel.class); + } + return builder; + } + + private void parseQueueCapacity(BeanDefinitionBuilder builder, Element queueElement) { + String capacity = queueElement.getAttribute(CAPACITY_ATTRIBUTE); + if (StringUtils.hasText(capacity)) { + if (!capacity.equals("UNBOUNDED")) { + builder.addConstructorArgValue(Integer.valueOf(capacity)); + } + } + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/PriorityChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/PriorityChannelParser.java deleted file mode 100644 index eeca7c5333..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/PriorityChannelParser.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.config; - -import org.w3c.dom.Element; - -import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.integration.channel.PriorityChannel; -import org.springframework.util.StringUtils; - -/** - * Parser for the <priority-channel> element. - * - * @author Mark Fisher - */ -public class PriorityChannelParser extends QueueChannelParser { - - @Override - protected Class getBeanClass(Element element) { - return PriorityChannel.class; - } - - @Override - protected void postProcess(BeanDefinitionBuilder builder, Element element) { - super.postProcess(builder, element); - String comparator = element.getAttribute("comparator"); - if (StringUtils.hasText(comparator)) { - builder.addConstructorArgReference(comparator); - } - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/PublishSubscribeChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/PublishSubscribeChannelParser.java index 56c6f33424..2ae0e2fa1a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/PublishSubscribeChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/PublishSubscribeChannelParser.java @@ -19,6 +19,7 @@ package org.springframework.integration.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.util.StringUtils; @@ -30,17 +31,14 @@ import org.springframework.util.StringUtils; public class PublishSubscribeChannelParser extends AbstractChannelParser { @Override - protected Class getBeanClass(Element element) { - return PublishSubscribeChannel.class; - } - - @Override - protected void postProcess(BeanDefinitionBuilder builder, Element element) { + protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(PublishSubscribeChannel.class); String taskExecutorRef = element.getAttribute("task-executor"); if (StringUtils.hasText(taskExecutorRef)) { builder.addConstructorArgReference(taskExecutorRef); } IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "apply-sequence"); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/QueueChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/QueueChannelParser.java deleted file mode 100644 index 361ee58dc2..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/QueueChannelParser.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.config; - -import org.w3c.dom.Element; - -import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.integration.channel.QueueChannel; -import org.springframework.util.StringUtils; - -/** - * Parser for the <queue-channel> element. - * - * @author Mark Fisher - */ -public class QueueChannelParser extends AbstractChannelParser { - - @Override - protected Class getBeanClass(Element element) { - return QueueChannel.class; - } - - @Override - protected void postProcess(BeanDefinitionBuilder builder, Element element) { - String capacityAttribute = element.getAttribute("capacity"); - int capacity = (StringUtils.hasText(capacityAttribute)) ? - Integer.parseInt(capacityAttribute) : QueueChannel.DEFAULT_CAPACITY; - builder.addConstructorArgValue(capacity); - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/RendezvousChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/RendezvousChannelParser.java deleted file mode 100644 index 265cfc255b..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/RendezvousChannelParser.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.config; - -import org.w3c.dom.Element; - -import org.springframework.integration.channel.RendezvousChannel; - -/** - * Parser for the <rendezvous-channel> element. - * - * @author Mark Fisher - */ -public class RendezvousChannelParser extends AbstractChannelParser { - - @Override - protected Class getBeanClass(Element element) { - return RendezvousChannel.class; - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ThreadLocalChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ThreadLocalChannelParser.java index 340ec93d81..9c9161e6b8 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ThreadLocalChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ThreadLocalChannelParser.java @@ -18,6 +18,8 @@ package org.springframework.integration.config; import org.w3c.dom.Element; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.channel.ThreadLocalChannel; /** @@ -28,8 +30,8 @@ import org.springframework.integration.channel.ThreadLocalChannel; public class ThreadLocalChannelParser extends AbstractChannelParser { @Override - protected Class getBeanClass(Element element) { - return ThreadLocalChannel.class; + protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) { + return BeanDefinitionBuilder.genericBeanDefinition(ThreadLocalChannel.class); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index d8bbb71743..6cc0bb16aa 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -47,32 +47,86 @@ - + - Defines a generic channel type. The actual channel type - will be determined by the channel factory set on the message bus. + Defines a Point-to-Point MessageChannel. + + + + + + + + + + + + + + - - - - Defines a channel that buffers messages in a queue. - - + + + + + Defines a bounded queue for messages. + + + + + + + + + + + + + + + + + + + + + + + + Defines a queue with priority-ordering for message reception. + + + + + + + + + + + + Defines a rendezvous queue where a sender will block until the receiver arrives or vice-versa. + + + - Defines a publish-subscribe channel that broadcasts to its targets. + Defines a Publish-Subscribe channel that broadcasts messages to its subscribers. + + + @@ -80,55 +134,14 @@ - - - - Defines a channel that invokes its handlers directly in the sender's thread. - - - - - - - - - Defines a channel with priority-ordering for message reception. - - - - - - - - - - - - - - - - Defines a channel with a configurable capacity. - - - - - - - - - Defines a message channel. - - - diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java index f08ed70b17..4f6517af98 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java @@ -17,6 +17,7 @@ package org.springframework.integration.dispatcher; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.MessageTarget; @@ -25,9 +26,10 @@ import org.springframework.integration.message.MessageTarget; * to send a {@link Message} to one of its targets. As soon as one * of the targets accepts the Message, the dispatcher will return 'true'. *

- * If all targets reject the Message, the dispatcher will throw a - * MessageRejectedException. If all targets return 'false' (e.g. due - * to a timeout), the dispatcher will return 'false'. + * If the dispatcher has no targets, a {@link MessageDeliveryException} + * will be thrown. If all targets reject the Message, the dispatcher will + * throw a MessageRejectedException. If all targets return 'false' + * (e.g. due to a timeout), the dispatcher will return 'false'. * * @author Mark Fisher */ @@ -35,10 +37,7 @@ public class SimpleDispatcher extends AbstractDispatcher { public boolean send(Message message) { if (this.targets.size() == 0) { - if (logger.isWarnEnabled()) { - logger.warn("Dispatcher has no targets."); - } - return false; + throw new MessageDeliveryException(message, "Dispatcher has no targets."); } int count = 0; int rejectedExceptionCount = 0; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageTarget.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageTarget.java index aecf0293c2..c570f5570c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageTarget.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageTarget.java @@ -36,7 +36,7 @@ public interface MessageTarget { * @throws MessageRejectedException if this particular Message is not accepted by the target * (e.g. after consulting a {@link org.springframework.integration.message.selector.MessageSelector}) * @throws MessageDeliveryException if this target is unable to send the Message due - * to a transport error + * to a transport error. */ boolean send(Message message) throws MessageRejectedException, MessageDeliveryException; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java index 99bcfee86b..63f681add3 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java @@ -26,10 +26,10 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.FatalBeanException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.PublishSubscribeChannel; -import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.TestChannelInterceptor; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; @@ -61,13 +61,11 @@ public class ChannelParserTests { } @Test - public void testQueueChannelByDefault() throws InterruptedException { + public void testDirectChannelByDefault() throws InterruptedException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "channelParserTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("queueChannelByDefault"); - //called to initialize the channel instance - channel.getName(); - assertEquals(QueueChannel.class, channel.getClass()); + MessageChannel channel = (MessageChannel) context.getBean("defaultChannel"); + assertEquals(DirectChannel.class, channel.getClass()); } @Test diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelInterceptorParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelInterceptorParserTests.xml index 2f8ac81685..adb98e7f93 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelInterceptorParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelInterceptorParserTests.xml @@ -8,12 +8,14 @@ http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd"> + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml index 0ea9adaf92..2114fdf39b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml @@ -7,20 +7,28 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd"> - + + + - + - + + + - + + + - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/priorityChannelParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/priorityChannelParserTests.xml index b0ea565add..b7904bbdc1 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/priorityChannelParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/priorityChannelParserTests.xml @@ -7,11 +7,17 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd"> - + + + - + + + - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/rendezvousChannelParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/rendezvousChannelParserTests.xml index 8b6b8cd0f3..b3799ca512 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/rendezvousChannelParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/rendezvousChannelParserTests.xml @@ -7,6 +7,8 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd"> - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/directChannelParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/directChannelParserTests.xml index 726c248263..b4eac91aba 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/directChannelParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/directChannelParserTests.xml @@ -7,6 +7,6 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd"> - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml index a7b7ebbedb..7f45366daa 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml @@ -9,7 +9,9 @@ - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java index fec4f3a1a0..5becb32976 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java @@ -17,6 +17,7 @@ package org.springframework.integration.config; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.List; @@ -24,6 +25,7 @@ import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.endpoint.EndpointInterceptor; import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.StringMessage; @@ -52,21 +54,24 @@ public class EndpointInterceptorTests { @SuppressWarnings("unchecked") private static void testInterceptors(MessageEndpoint endpoint, ClassPathXmlApplicationContext context, boolean innerBeans) { + MessageChannel channel = null; TestPreHandleInterceptor preInterceptor = null; TestPostHandleInterceptor postInterceptor = null; if (innerBeans) { + channel = (MessageChannel) context.getBean("inputChannelForBeans"); DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); List interceptors = (List) accessor.getPropertyValue("interceptors"); preInterceptor = (TestPreHandleInterceptor) interceptors.get(0); postInterceptor = (TestPostHandleInterceptor) interceptors.get(1); } else { + channel = (MessageChannel) context.getBean("inputChannelForRefs"); preInterceptor = (TestPreHandleInterceptor) context.getBean("preInterceptor"); postInterceptor = (TestPostHandleInterceptor) context.getBean("postInterceptor"); } assertEquals(0, preInterceptor.getCount()); assertEquals(0, postInterceptor.getCount()); - endpoint.send(new StringMessage("test")); + assertTrue(channel.send(new StringMessage("test"))); assertEquals(1, preInterceptor.getCount()); assertEquals(1, postInterceptor.getCount()); context.stop(); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml index 7f48e2764c..f984257e3c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml @@ -10,8 +10,12 @@ - - + + + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/handlerAnnotationPostProcessorTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/handlerAnnotationPostProcessorTests.xml index 2116d70e45..350cba9459 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/handlerAnnotationPostProcessorTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/handlerAnnotationPostProcessorTests.xml @@ -10,7 +10,10 @@ - + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/messageParameterAnnotatedEndpointTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/messageParameterAnnotatedEndpointTests.xml index 967694cf50..1006ae307b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/messageParameterAnnotatedEndpointTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/messageParameterAnnotatedEndpointTests.xml @@ -13,7 +13,9 @@ - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/simpleAnnotatedEndpointTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/simpleAnnotatedEndpointTests.xml index ffcaa5a736..34bc5e3546 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/simpleAnnotatedEndpointTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/simpleAnnotatedEndpointTests.xml @@ -13,7 +13,9 @@ - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/typeConvertingEndpointTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/typeConvertingEndpointTests.xml index 16b6b156f3..aeac23e091 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/typeConvertingEndpointTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/typeConvertingEndpointTests.xml @@ -11,7 +11,9 @@ - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml index f3c32e6e78..c8d57a0a63 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml @@ -7,17 +7,18 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd"> - + - - - + + + + + - + output-channel="outputChannel"> @@ -25,10 +26,9 @@ - + output-channel="outputChannel"> diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithErrorHandler.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithErrorHandler.xml index 2ee5ff3aa6..dca0e59afa 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithErrorHandler.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithErrorHandler.xml @@ -9,7 +9,7 @@ - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithHandlerChainElement.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithHandlerChainElement.xml index 4f269ddb4c..a5282e4693 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithHandlerChainElement.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithHandlerChainElement.xml @@ -9,9 +9,13 @@ - + + + - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml index 8bcdc28bab..c483be32a4 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml @@ -9,7 +9,9 @@ - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/handlerAdapterEndpointTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/handlerAdapterEndpointTests.xml index e104181840..c11ef8e20d 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/handlerAdapterEndpointTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/handlerAdapterEndpointTests.xml @@ -9,7 +9,9 @@ - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml index 1ca55033e6..748f10e525 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml @@ -8,11 +8,15 @@ http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd"> - - - - + + + + + + + + - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/wireTapParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/wireTapParserTests.xml index b2b3045d5a..856de32581 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/wireTapParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/wireTapParserTests.xml @@ -8,24 +8,28 @@ http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd"> + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java index cafac78cf1..ce9e00845a 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java @@ -30,6 +30,7 @@ import org.springframework.integration.endpoint.DefaultEndpoint; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.handler.TestHandlers; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.StringMessage; @@ -108,9 +109,18 @@ public class SimpleDispatcherTests { dispatcher.unsubscribe(target1); dispatcher.send(new StringMessage("test3")); assertEquals(6, counter.get()); - dispatcher.unsubscribe(target3); - dispatcher.send(new StringMessage("test4")); - assertEquals(6, counter.get()); + } + + @Test(expected = MessageDeliveryException.class) + public void unsubscribeLastTargetCausesDeliveryException() { + SimpleDispatcher dispatcher = new SimpleDispatcher(); + final AtomicInteger counter = new AtomicInteger(); + MessageTarget target = new CountingTestTarget(counter, false); + dispatcher.subscribe(target); + dispatcher.send(new StringMessage("test1")); + assertEquals(1, counter.get()); + dispatcher.unsubscribe(target); + dispatcher.send(new StringMessage("test2")); } @Test diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/propagationMandatoryTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/propagationMandatoryTests.xml index 893d15b4d9..8f2fa7f1df 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/propagationMandatoryTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/propagationMandatoryTests.xml @@ -9,9 +9,17 @@ - - - + + + + + + + + + + + - - + + + + + + + - - + + + + + + + - - + + + + + + + - - + + + + + + + - - - + + + + + + + + + + + - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml index a161de0042..16c88866b8 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml @@ -10,18 +10,30 @@ - + + + - - - + + + + + + + + + - + + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/config/gatewayParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/config/gatewayParserTests.xml index a533214a85..f39e195986 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/config/gatewayParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/config/gatewayParserTests.xml @@ -9,9 +9,13 @@ - + + + - + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/routerParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/routerParserTests.xml index 787c4741d6..739748c331 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/routerParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/routerParserTests.xml @@ -11,9 +11,13 @@ - + + + - + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/splitterAggregatorTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/splitterAggregatorTests.xml index c6f7b7ea09..5026227843 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/splitterAggregatorTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/splitterAggregatorTests.xml @@ -9,9 +9,11 @@ - - - + + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/splitterParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/splitterParserTests.xml index ca2a276054..cc195adfa5 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/splitterParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/splitterParserTests.xml @@ -11,7 +11,9 @@ - + + +