diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java index 42d1033fe4..f92082ae5d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java @@ -44,6 +44,7 @@ public class PointToPointChannelParser extends AbstractChannelParser { if ((queueElement = DomUtils.getChildElementByTagName(element, "queue")) != null) { builder = BeanDefinitionBuilder.genericBeanDefinition(CHANNEL_PACKAGE + ".QueueChannel"); this.parseQueueCapacity(builder, queueElement); + this.parseQueueRef(builder, queueElement); } else if ((queueElement = DomUtils.getChildElementByTagName(element, "priority-queue")) != null) { builder = BeanDefinitionBuilder.genericBeanDefinition(CHANNEL_PACKAGE + ".PriorityChannel"); @@ -63,6 +64,7 @@ public class PointToPointChannelParser extends AbstractChannelParser { return builder; } + private void parseDispatcher(String dispatcherAttribute, BeanDefinitionBuilder builder, ParserContext parserContext) { if (dispatcherAttribute != null) { if (dispatcherAttribute.equals("failover")) { @@ -83,4 +85,10 @@ public class PointToPointChannelParser extends AbstractChannelParser { } } + private void parseQueueRef(BeanDefinitionBuilder builder, Element queueElement) { + String queueRef = queueElement.getAttribute("ref"); + if (StringUtils.hasText(queueRef)){ + builder.addConstructorArgReference(queueRef); + } + } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd index 04a4734069..c230a53e20 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd @@ -81,9 +81,21 @@ Defines a queue for messages. If 'capacity' is specified, it will be a bounded queue. + + A custom Queue implementation can be + injected using the 'ref' attribute. + + + + + + + + + @@ -110,8 +122,8 @@ - Defines a Publish-Subscribe channel that - broadcasts messages to its subscribers. + Defines a Publish-Subscribe channel that + broadcasts messages to its subscribers. @@ -155,8 +167,9 @@ - Defines a channel that maintains its Messages - on a thread-bound queue. + Defines a channel that maintains its Messages + on a + thread-bound queue. @@ -188,8 +201,10 @@ - Defines a dispatching strategy for the channel. The default is a round - robin load balancer. In case of a publish subscribe channel this + Defines a dispatching strategy for the channel. + The default is a round + robin load balancer. In case of a publish + subscribe channel this attribute will be ignored. @@ -453,12 +468,13 @@ - + + type="org.springframework.integration.core.MessageChannel" /> @@ -807,7 +823,7 @@ - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java index 45690b0573..b82e6630c2 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java @@ -17,8 +17,6 @@ package org.springframework.integration.aggregator; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.awt.Button; import java.io.Serializable; @@ -29,9 +27,11 @@ import java.util.List; import org.junit.Before; import org.junit.Test; import org.springframework.integration.core.Message; +import org.springframework.integration.message.MessageBuilder; /** * @author Alex Peters + * @author Iwein Fuld * */ public class DefaultMessageAggregatorTests { @@ -44,14 +44,12 @@ public class DefaultMessageAggregatorTests { } @SuppressWarnings("unchecked") - @Test + @Test(timeout=1000) public void aggregateMessages_withMultiplePayloads_allAsListInResultMsg() { List anyPayloads = Arrays.asList("foo", "bar", 123L, new Button()); List> messageGroup = new ArrayList>(anyPayloads.size()); for (Serializable payload : anyPayloads) { - Message mock = mock(Message.class); - when(mock.getPayload()).thenReturn(payload); - messageGroup.add(mock); + messageGroup.add(MessageBuilder.withPayload(payload).build()); } Message result = aggregator.aggregateMessages(messageGroup); assertThat((List) result.getPayload(), is(anyPayloads)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java index 73d7a5d046..6eb21d3e83 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java @@ -48,6 +48,8 @@ import org.springframework.integration.util.ErrorHandlingTaskExecutor; /** * @author Mark Fisher * @author Iwein Fuld + * + * @see ChannelWithCustomQueueParserTests */ public class ChannelParserTests { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelWithCustomQueueParserTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelWithCustomQueueParserTests-context.xml new file mode 100644 index 0000000000..11912de4e9 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelWithCustomQueueParserTests-context.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelWithCustomQueueParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelWithCustomQueueParserTests.java new file mode 100644 index 0000000000..0338c0669a --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelWithCustomQueueParserTests.java @@ -0,0 +1,65 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.channel.config; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.hamcrest.CoreMatchers.*; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * Testcases for detailed namespace support for <queue/> element under + * <channel/> + * + * @author Iwein Fuld + * + * @see ChannelWithCustomQueueParserTests + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration +public class ChannelWithCustomQueueParserTests { + + @Qualifier("customQueueChannel") + @Autowired + QueueChannel customQueueChannel; + + @Test + public void parseConfig() throws Exception { + assertNotNull(customQueueChannel); + } + + @Test + public void queueTypeSet() throws Exception { + DirectFieldAccessor accessor = new DirectFieldAccessor(customQueueChannel); + Object queue = accessor.getPropertyValue("queue"); + assertNotNull(queue); + assertThat(queue, is(ArrayBlockingQueue.class)); + assertThat(((BlockingQueue)queue).remainingCapacity(), is(2)); + } + +}