INT-736, added DelayerUsageTest showing how to apply delayer
This commit is contained in:
@@ -0,0 +1,46 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans:beans xmlns="http://www.springframework.org/schema/integration"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:beans="http://www.springframework.org/schema/beans"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans
|
||||
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
|
||||
http://www.springframework.org/schema/integration
|
||||
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
|
||||
|
||||
<channel id="inputA"/>
|
||||
|
||||
<channel id="outputA">
|
||||
<queue />
|
||||
</channel>
|
||||
|
||||
<delayer id="delayerWithDefaultScheduler"
|
||||
input-channel="inputA"
|
||||
output-channel="outputA"
|
||||
default-delay="1000"
|
||||
delay-header-name="foo"
|
||||
order="99"
|
||||
send-timeout="1000"
|
||||
wait-for-tasks-to-complete-on-shutdown="true"/>
|
||||
|
||||
<channel id="inputB"/>
|
||||
<channel id="outputB"/>
|
||||
<channel id="outputB1">
|
||||
<queue />
|
||||
</channel>
|
||||
|
||||
<delayer id="delayerWithCustomScheduler"
|
||||
input-channel="inputB"
|
||||
output-channel="outputB"
|
||||
default-delay="1000"
|
||||
send-timeout="20000"
|
||||
scheduler="multiThreadScheduler"/>
|
||||
|
||||
<beans:bean id="multiThreadScheduler" class="org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean">
|
||||
<beans:property name="poolSize" value="5"/>
|
||||
</beans:bean>
|
||||
|
||||
<service-activator input-channel="outputB" output-channel="outputB1" method="processMessage" ref="sampleHandler"/>
|
||||
|
||||
<beans:bean id="sampleHandler" class="org.springframework.integration.config.xml.DelayerUsageTests$SampleService"/>
|
||||
|
||||
</beans:beans>
|
||||
@@ -0,0 +1,100 @@
|
||||
/*
|
||||
* Copyright 2002-2009 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.config.xml;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.core.Message;
|
||||
import org.springframework.integration.core.MessageChannel;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
/**
|
||||
* @author Oleg Zhurakousky
|
||||
* @since 1.0.3
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration
|
||||
public class DelayerUsageTests {
|
||||
|
||||
@Autowired @Qualifier("inputA")
|
||||
private MessageChannel inputA;
|
||||
@Autowired @Qualifier("outputA")
|
||||
private PollableChannel outputA;
|
||||
|
||||
@Autowired @Qualifier("inputB")
|
||||
private MessageChannel inputB;
|
||||
@Autowired @Qualifier("outputB1")
|
||||
private PollableChannel outputB1;
|
||||
@Autowired
|
||||
private SampleService sampleHandler;
|
||||
|
||||
@Test
|
||||
public void testDelayWithDefaultScheduler(){
|
||||
long start = System.currentTimeMillis();
|
||||
inputA.send(new StringMessage("Hello"));
|
||||
Message<String> msg = (Message<String>) outputA.receive();
|
||||
assertTrue((System.currentTimeMillis() - start) >= 1000);
|
||||
}
|
||||
@Test
|
||||
public void testDelayWithDefaultSchedulerCustomDelayHeader(){
|
||||
MessageBuilder builder = MessageBuilder.withPayload("Hello");
|
||||
// set custom delay header
|
||||
builder.setHeader("foo", 2000);
|
||||
long start = System.currentTimeMillis();
|
||||
inputA.send(builder.build());
|
||||
Message<String> msg = (Message<String>) outputA.receive();
|
||||
assertTrue((System.currentTimeMillis() - start) >= 2000);
|
||||
}
|
||||
@Test
|
||||
public void testDelayWithCustomScheduler(){
|
||||
long start = System.currentTimeMillis();
|
||||
inputB.send(new StringMessage("1"));
|
||||
inputB.send(new StringMessage("2"));
|
||||
inputB.send(new StringMessage("3"));
|
||||
inputB.send(new StringMessage("4"));
|
||||
inputB.send(new StringMessage("5"));
|
||||
inputB.send(new StringMessage("6"));
|
||||
inputB.send(new StringMessage("7"));
|
||||
Message<String> msg = (Message<String>) outputB1.receive();
|
||||
msg = (Message<String>) outputB1.receive();
|
||||
msg = (Message<String>) outputB1.receive();
|
||||
msg = (Message<String>) outputB1.receive();
|
||||
msg = (Message<String>) outputB1.receive();
|
||||
msg = (Message<String>) outputB1.receive();
|
||||
msg = (Message<String>) outputB1.receive();
|
||||
|
||||
// must execute under 3 seconds, since threadPool is set too 5.
|
||||
// first batch is 5 concurrent invocations on SA, then 2 more
|
||||
// elapsed time for the whole execution should be a bit over 2 seconds depending on the hardware
|
||||
assertTrue(((System.currentTimeMillis() - start) >= 1000) && ((System.currentTimeMillis() - start) < 3000));
|
||||
}
|
||||
|
||||
|
||||
public static class SampleService{
|
||||
public String processMessage(String message) throws Exception {
|
||||
Thread.sleep(500);
|
||||
return message;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user