The <channel/> element is now used for creating all Point-to-Point channel types. It accepts a queue sub-element (options are: <queue/>, <priority-queue/>, or <rendezvous-queue/>). If no queue sub-element is provided, the channel type will be a DirectChannel.

This commit is contained in:
Mark Fisher
2008-09-01 22:50:56 +00:00
parent a27ae5726e
commit 788b2364ec
56 changed files with 442 additions and 358 deletions

View File

@@ -26,10 +26,10 @@ import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.FatalBeanException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.TestChannelInterceptor;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
@@ -61,13 +61,11 @@ public class ChannelParserTests {
}
@Test
public void testQueueChannelByDefault() throws InterruptedException {
public void testDirectChannelByDefault() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("queueChannelByDefault");
//called to initialize the channel instance
channel.getName();
assertEquals(QueueChannel.class, channel.getClass());
MessageChannel channel = (MessageChannel) context.getBean("defaultChannel");
assertEquals(DirectChannel.class, channel.getClass());
}
@Test

View File

@@ -8,12 +8,14 @@
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<channel id="channelWithInterceptorRef">
<queue capacity="5"/>
<interceptors>
<ref bean="interceptor"/>
</interceptors>
</channel>
<channel id="channelWithInterceptorInnerBean">
<queue capacity="5"/>
<interceptors>
<beans:bean class="org.springframework.integration.transformer.MessageTransformingChannelInterceptor">
<beans:constructor-arg>

View File

@@ -7,20 +7,28 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<queue-channel id="capacityChannel" capacity="10"/>
<channel id="capacityChannel">
<queue capacity="10"/>
</channel>
<channel id="queueChannelByDefault"/>
<channel id="defaultChannel"/>
<publish-subscribe-channel id="publishSubscribeChannel"/>
<publish-subscribe-channel id="publishSubscribeChannelWithTaskExecutorRef"
task-executor="taskExecutor"/>
<channel id="integerChannel" datatype="java.lang.Integer"/>
<channel id="integerChannel" datatype="java.lang.Integer">
<queue capacity="10"/>
</channel>
<channel id="numberChannel" datatype="java.lang.Number"/>
<channel id="numberChannel" datatype="java.lang.Number">
<queue capacity="10"/>
</channel>
<channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
<channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number">
<queue capacity="10"/>
</channel>
<beans:bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>

View File

@@ -7,11 +7,17 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<priority-channel id="priorityChannelWithDefaultComparator"/>
<channel id="priorityChannelWithDefaultComparator">
<priority-queue capacity="10"/>
</channel>
<priority-channel id="priorityChannelWithCustomComparator" comparator="payloadComparator"/>
<channel id="priorityChannelWithCustomComparator">
<priority-queue capacity="10" comparator="payloadComparator"/>
</channel>
<priority-channel id="integerOnlyPriorityChannel" datatype="java.lang.Integer" comparator="payloadComparator"/>
<channel id="integerOnlyPriorityChannel" datatype="java.lang.Integer">
<priority-queue capacity="10" comparator="payloadComparator"/>
</channel>
<beans:bean id="payloadComparator"
class="org.springframework.integration.channel.MessagePayloadTestComparator"/>

View File

@@ -7,6 +7,8 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<rendezvous-channel id="channel"/>
<channel id="channel">
<rendezvous-queue/>
</channel>
</beans:beans>

View File

@@ -7,6 +7,6 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<direct-channel id="channel"/>
<channel id="channel"/>
</beans:beans>

View File

@@ -9,7 +9,9 @@
<message-bus auto-startup="false"/>
<queue-channel id="queueChannel" capacity="10"/>
<channel id="queueChannel">
<queue capacity="10"/>
</channel>
<channel-adapter id="outboundWithImplicitChannel" target="target"/>

View File

@@ -17,6 +17,7 @@
package org.springframework.integration.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
@@ -24,6 +25,7 @@ import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.endpoint.EndpointInterceptor;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.StringMessage;
@@ -52,21 +54,24 @@ public class EndpointInterceptorTests {
@SuppressWarnings("unchecked")
private static void testInterceptors(MessageEndpoint endpoint, ClassPathXmlApplicationContext context, boolean innerBeans) {
MessageChannel channel = null;
TestPreHandleInterceptor preInterceptor = null;
TestPostHandleInterceptor postInterceptor = null;
if (innerBeans) {
channel = (MessageChannel) context.getBean("inputChannelForBeans");
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
List<EndpointInterceptor> interceptors = (List<EndpointInterceptor>) accessor.getPropertyValue("interceptors");
preInterceptor = (TestPreHandleInterceptor) interceptors.get(0);
postInterceptor = (TestPostHandleInterceptor) interceptors.get(1);
}
else {
channel = (MessageChannel) context.getBean("inputChannelForRefs");
preInterceptor = (TestPreHandleInterceptor) context.getBean("preInterceptor");
postInterceptor = (TestPostHandleInterceptor) context.getBean("postInterceptor");
}
assertEquals(0, preInterceptor.getCount());
assertEquals(0, postInterceptor.getCount());
endpoint.send(new StringMessage("test"));
assertTrue(channel.send(new StringMessage("test")));
assertEquals(1, preInterceptor.getCount());
assertEquals(1, postInterceptor.getCount());
context.stop();

View File

@@ -10,8 +10,12 @@
<message-bus/>
<channel id="inputChannel"/>
<channel id="outputChannel"/>
<channel id="discardChannel"/>
<channel id="outputChannel">
<queue capacity="5"/>
</channel>
<channel id="discardChannel">
<queue capacity="5"/>
</channel>
<aggregator id="aggregatorWithReference" ref="aggregatorBean" input-channel="inputChannel"/>

View File

@@ -10,7 +10,10 @@
<message-bus/>
<channel id="inputChannel"/>
<channel id="outputChannel"/>
<channel id="outputChannel">
<queue capacity="5"/>
</channel>
<service-activator input-channel="inputChannel" ref="simpleHandler" output-channel="outputChannel"/>

View File

@@ -13,7 +13,9 @@
<integration:channel id="inputChannel"/>
<integration:channel id="outputChannel"/>
<integration:channel id="outputChannel">
<integration:queue capacity="5"/>
</integration:channel>
<bean id="endpoint" class="org.springframework.integration.endpoint.annotation.MessageParameterAnnotatedEndpoint"/>

View File

@@ -13,7 +13,9 @@
<integration:channel id="inputChannel"/>
<integration:channel id="outputChannel"/>
<integration:channel id="outputChannel">
<integration:queue capacity="5"/>
</integration:channel>
<bean id="endpoint" class="org.springframework.integration.config.annotation.SimpleAnnotatedEndpoint"/>

View File

@@ -11,7 +11,9 @@
<integration:channel id="inputChannel"/>
<integration:channel id="outputChannel"/>
<integration:channel id="outputChannel">
<integration:queue capacity="5"/>
</integration:channel>
<bean id="endpoint" class="org.springframework.integration.config.annotation.TypeConvertingTestEndpoint"/>

View File

@@ -7,17 +7,18 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus auto-startup="false"/>
<message-bus/>
<channel id="testChannel"/>
<channel id="replyChannel"/>
<channel id="inputChannelForBeans"/>
<channel id="inputChannelForRefs"/>
<channel id="outputChannel">
<queue capacity="10"/>
</channel>
<service-activator id="endpointWithBeanInterceptors"
input-channel="testChannel"
input-channel="inputChannelForBeans"
ref="testHandler"
output-channel="replyChannel">
<poller period="10000" max-messages-per-poll="1"/>
output-channel="outputChannel">
<interceptors>
<beans:bean class="org.springframework.integration.config.TestPreHandleInterceptor"/>
<beans:bean class="org.springframework.integration.config.TestPostHandleInterceptor"/>
@@ -25,10 +26,9 @@
</service-activator>
<service-activator id="endpointWithRefInterceptors"
input-channel="testChannel"
input-channel="inputChannelForRefs"
ref="testHandler"
output-channel="replyChannel">
<poller period="10000" max-messages-per-poll="1"/>
output-channel="outputChannel">
<interceptors>
<ref bean="preInterceptor"/>
<ref bean="postInterceptor"/>

View File

@@ -9,7 +9,7 @@
<message-bus/>
<direct-channel id="channel"/>
<channel id="channel"/>
<service-activator id="endpoint" input-channel="channel" ref="handler" error-handler="errorHandler"/>

View File

@@ -9,9 +9,13 @@
<message-bus/>
<queue-channel id="testChannel" capacity="50"/>
<channel id="testChannel">
<queue capacity="5"/>
</channel>
<channel id="replyChannel"/>
<channel id="replyChannel">
<queue capacity="5"/>
</channel>
<handler-chain id="chain">
<handler ref="handler1"/>

View File

@@ -9,7 +9,9 @@
<message-bus/>
<queue-channel id="testChannel" capacity="50"/>
<channel id="testChannel">
<queue capacity="50"/>
</channel>
<service-activator id="endpoint" input-channel="testChannel"
ref="testHandler" selector="typeSelector">

View File

@@ -9,7 +9,9 @@
<beans:bean id="messageBus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<queue-channel id="testChannel" capacity="50"/>
<channel id="testChannel">
<queue capacity="50"/>
</channel>
<service-activator input-channel="testChannel" ref="testBean" method="store">
<poller period="100"/>

View File

@@ -8,11 +8,15 @@
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus/>
<channel id="outputChannel" />
<channel id="discardChannel" />
<channel id="outputChannel">
<queue capacity="5"/>
</channel>
<channel id="discardChannel">
<queue capacity="5"/>
</channel>
<resequencer id="defaultResequencer"/>
<resequencer id="completelyDefinedResequencer"

View File

@@ -9,7 +9,9 @@
<beans:bean id="messageBus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<queue-channel id="testChannel" capacity="50"/>
<channel id="testChannel">
<queue capacity="50"/>
</channel>
<service-activator input-channel="testChannel" ref="testHandler">
<poller period="100"/>

View File

@@ -8,24 +8,28 @@
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<channel id="simple">
<queue capacity="10"/>
<interceptors>
<wire-tap target="testTarget"/>
</interceptors>
</channel>
<channel id="accepting">
<queue capacity="10"/>
<interceptors>
<wire-tap target="testTarget" selector="acceptingSelector"/>
</interceptors>
</channel>
<channel id="rejecting">
<queue capacity="10"/>
<interceptors>
<wire-tap target="testTarget" selector="rejectingSelector"/>
</interceptors>
</channel>
<channel id="timeout">
<queue capacity="10"/>
<interceptors>
<wire-tap target="testTarget" timeout="1234"/>
</interceptors>

View File

@@ -30,6 +30,7 @@ import org.springframework.integration.endpoint.DefaultEndpoint;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
@@ -108,9 +109,18 @@ public class SimpleDispatcherTests {
dispatcher.unsubscribe(target1);
dispatcher.send(new StringMessage("test3"));
assertEquals(6, counter.get());
dispatcher.unsubscribe(target3);
dispatcher.send(new StringMessage("test4"));
assertEquals(6, counter.get());
}
@Test(expected = MessageDeliveryException.class)
public void unsubscribeLastTargetCausesDeliveryException() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target = new CountingTestTarget(counter, false);
dispatcher.subscribe(target);
dispatcher.send(new StringMessage("test1"));
assertEquals(1, counter.get());
dispatcher.unsubscribe(target);
dispatcher.send(new StringMessage("test2"));
}
@Test

View File

@@ -9,9 +9,17 @@
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<channel id="errorChannel"/>
<channel id="input">
<queue capacity="1"/>
</channel>
<channel id="output">
<queue capacity="1"/>
</channel>
<channel id="errorChannel">
<queue capacity="1"/>
</channel>
<service-activator id="mandatory"
input-channel="input"

View File

@@ -9,8 +9,13 @@
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<channel id="input">
<queue capacity="1"/>
</channel>
<channel id="output">
<queue capacity="1"/>
</channel>
<service-activator id="notSupported"
input-channel="input"

View File

@@ -9,8 +9,13 @@
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<channel id="input">
<queue capacity="1"/>
</channel>
<channel id="output">
<queue capacity="1"/>
</channel>
<service-activator id="required"
input-channel="input"

View File

@@ -9,8 +9,13 @@
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<channel id="input">
<queue capacity="1"/>
</channel>
<channel id="output">
<queue capacity="1"/>
</channel>
<service-activator id="requiresNew"
input-channel="input"

View File

@@ -9,8 +9,13 @@
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<channel id="input">
<queue capacity="1"/>
</channel>
<channel id="output">
<queue capacity="1"/>
</channel>
<service-activator id="supports"
input-channel="input"

View File

@@ -9,9 +9,17 @@
<message-bus/>
<channel id="badInput"/>
<channel id="goodInput"/>
<channel id="output"/>
<channel id="badInput">
<queue capacity="1"/>
</channel>
<channel id="goodInput">
<queue capacity="1"/>
</channel>
<channel id="output">
<queue capacity="1"/>
</channel>
<service-activator input-channel="badInput"
ref="testBean"
@@ -27,7 +35,7 @@
method="good"
output-channel="output">
<poller period="10000" max-messages-per-poll="1">
<transactional transaction-manager="txManager" propagation="REQUIRED"/>
<transactional transaction-manager="txManager"/>
</poller>
</service-activator>

View File

@@ -10,18 +10,30 @@
<si:message-bus/>
<si:channel id="channel1"/>
<si:channel id="channel2"/>
<si:channel id="channel2">
<si:queue capacity="5"/>
</si:channel>
<si:channel id="channel3"/>
<si:channel id="channel4"/>
<si:channel id="channel5"/>
<si:channel id="replyChannel"/>
<si:channel id="errorChannel"/>
<si:channel id="channel5">
<si:queue capacity="5"/>
</si:channel>
<si:channel id="replyChannel">
<si:queue capacity="5"/>
</si:channel>
<si:channel id="errorChannel">
<si:queue capacity="5"/>
</si:channel>
<si:service-activator input-channel="channel1" ref="testBean" method="duplicate" output-channel="channel2"/>
<si:service-activator input-channel="channel2" ref="testBean" method="duplicate" output-channel="channel3"/>
<si:service-activator input-channel="channel3" ref="testBean" method="duplicate"/>
<si:service-activator input-channel="channel3" ref="testBean" method="duplicate" error-handler="errorHandler"/>
<si:service-activator input-channel="channel4" ref="testBean" method="duplicate" output-channel="replyChannel"/>
<bean id="errorHandler" class="org.springframework.integration.channel.MessagePublishingErrorHandler">
<property name="errorChannel" ref="errorChannel"/>
</bean>
<bean id="testBean" class="org.springframework.integration.endpoint.TestBean"/>
</beans>

View File

@@ -9,9 +9,13 @@
<message-bus/>
<channel id="requestChannel"/>
<channel id="requestChannel">
<queue capacity="100"/>
</channel>
<channel id="replyChannel"/>
<channel id="replyChannel">
<queue capacity="100"/>
</channel>
<gateway id="oneWay"
service-interface="org.springframework.integration.gateway.TestService"

View File

@@ -12,6 +12,7 @@
<channel id="requestChannel"/>
<channel id="replyChannel">
<queue capacity="100"/>
<interceptors>
<ref bean="interceptor"/>
</interceptors>

View File

@@ -11,9 +11,13 @@
<channel id="input"/>
<channel id="output1"/>
<channel id="output1">
<queue capacity="1"/>
</channel>
<channel id="output2"/>
<channel id="output2">
<queue capacity="1"/>
</channel>
<router id="router" ref="pojo" method="route" input-channel="input"/>

View File

@@ -9,9 +9,11 @@
<message-bus/>
<direct-channel id="numbers"/>
<direct-channel id="splits"/>
<channel id="results"/>
<channel id="numbers"/>
<channel id="splits"/>
<channel id="results">
<queue capacity="10"/>
</channel>
<splitter ref="splitter" method="split" input-channel="numbers" output-channel="splits"/>
<aggregator ref="aggregator" method="sum" input-channel="splits" output-channel="results"/>

View File

@@ -11,7 +11,9 @@
<channel id="channel1"/>
<channel id="channel2"/>
<channel id="channel2">
<queue capacity="5"/>
</channel>
<splitter id="splitter" ref="pojo" method="split"
input-channel="channel1" output-channel="channel2"/>