This commit is contained in:
@@ -15,8 +15,13 @@
|
||||
*/
|
||||
package org.springframework.integration.config.xml;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import junit.framework.Assert;
|
||||
@@ -31,7 +36,10 @@ import org.springframework.core.io.InputStreamResource;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.core.Message;
|
||||
import org.springframework.integration.core.MessageChannel;
|
||||
import org.springframework.integration.core.MessageHeaders;
|
||||
import org.springframework.integration.endpoint.EventDrivenConsumer;
|
||||
import org.springframework.integration.message.GenericMessage;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
@@ -130,6 +138,32 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
|
||||
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
|
||||
reader.loadBeanDefinitions(new InputStreamResource(stream));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInnerAggregatorDefinitionSuccess(){
|
||||
String configProperty = testConfigurations.getProperty("aggregator-inner-success");
|
||||
this.testAggregatorDefinitionSuccess(configProperty);
|
||||
}
|
||||
@Test
|
||||
public void testInnerConcurrentAggregatorDefinitionSuccess(){
|
||||
String configProperty = testConfigurations.getProperty("aggregator-inner-concurrent-success");
|
||||
this.testAggregatorDefinitionSuccess(configProperty);
|
||||
}
|
||||
@Test
|
||||
public void testRefAggregatorDefinitionSuccess(){
|
||||
String configProperty = testConfigurations.getProperty("aggregator-ref-success");
|
||||
this.testAggregatorDefinitionSuccess(configProperty);
|
||||
}
|
||||
@Test(expected=BeanDefinitionStoreException.class)
|
||||
public void testInnerAggregatorDefinitionFailureRefAndInner(){
|
||||
String xmlConfig = testConfigurations.getProperty("aggregator-failure-refAndBean");
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(xmlConfig.getBytes());
|
||||
GenericApplicationContext ac = new GenericApplicationContext();
|
||||
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
|
||||
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
|
||||
reader.loadBeanDefinitions(new InputStreamResource(stream));
|
||||
}
|
||||
|
||||
private void testSplitterDefinitionSuccess(String configProperty){
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes());
|
||||
GenericApplicationContext ac = new GenericApplicationContext();
|
||||
@@ -199,6 +233,33 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
|
||||
PollableChannel channel1 = (PollableChannel) ac.getBean("outChannel");
|
||||
Assert.assertTrue(channel1.receive().getPayload().equals("1"));
|
||||
}
|
||||
private void testAggregatorDefinitionSuccess(String configProperty){
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes());
|
||||
GenericApplicationContext ac = new GenericApplicationContext();
|
||||
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
|
||||
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
|
||||
reader.loadBeanDefinitions(new InputStreamResource(stream));
|
||||
ac.refresh();
|
||||
ac.start();
|
||||
MessageChannel inChannel = (MessageChannel) ac.getBean("inChannel");
|
||||
for (int i = 0; i < 5; i++) {
|
||||
Map<String, Object> headers = stubHeaders(i, 5, 1);
|
||||
Message message = new GenericMessage<Integer>(i, headers);
|
||||
inChannel.send(message);
|
||||
}
|
||||
PollableChannel output = (PollableChannel) ac.getBean("outChannel");
|
||||
assertEquals(0 + 1 + 2 + 3 + 4, output.receive().getPayload());
|
||||
System.out.println();
|
||||
}
|
||||
|
||||
private Map<String, Object> stubHeaders(int sequenceNumber, int sequenceSize, int correllationId) {
|
||||
Map<String, Object> headers = new HashMap<String, Object>();
|
||||
headers.put(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber);
|
||||
headers.put(MessageHeaders.SEQUENCE_SIZE, sequenceSize);
|
||||
headers.put(MessageHeaders.CORRELATION_ID, correllationId);
|
||||
headers.put(MessageHeaders.ID, 1);
|
||||
return headers;
|
||||
}
|
||||
|
||||
public static class TestSplitter{
|
||||
public Collection split(String[] payload){
|
||||
@@ -222,5 +283,15 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestAggregator{
|
||||
public Integer sum(List<Integer> numbers) {
|
||||
int result = 0;
|
||||
for (Integer number : numbers) {
|
||||
result += number;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -192,4 +192,75 @@ sa-failure-refAndBean=\
|
||||
<beans:bean class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestServiceActivator" /> \
|
||||
</service-activator> \
|
||||
<beans:bean id="testServiceActivatorBean" class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestServiceActivator" /> \
|
||||
</beans:beans>
|
||||
aggregator-inner-success=\
|
||||
<?xml version="1.0" encoding="UTF-8"?> \
|
||||
<beans:beans xmlns="http://www.springframework.org/schema/integration" \
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" \
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans \
|
||||
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd \
|
||||
http://www.springframework.org/schema/integration \
|
||||
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> \
|
||||
<channel id="inChannel"/> \
|
||||
<channel id="outChannel"> \
|
||||
<queue capacity="5" /> \
|
||||
</channel> \
|
||||
<aggregator input-channel="inChannel" method="sum" output-channel="outChannel"> \
|
||||
<beans:bean class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestAggregator"/> \
|
||||
</aggregator> \
|
||||
</beans:beans>
|
||||
aggregator-ref-success=\
|
||||
<?xml version="1.0" encoding="UTF-8"?> \
|
||||
<beans:beans xmlns="http://www.springframework.org/schema/integration" \
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" \
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans \
|
||||
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd \
|
||||
http://www.springframework.org/schema/integration \
|
||||
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> \
|
||||
<channel id="inChannel"> \
|
||||
</channel> \
|
||||
<channel id="outChannel"> \
|
||||
<queue capacity="5" /> \
|
||||
</channel> \
|
||||
<aggregator input-channel="inChannel" ref="aggregator" method="sum" output-channel="outChannel"/> \
|
||||
<beans:bean id="aggregator" class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestAggregator"/> \
|
||||
</beans:beans>
|
||||
aggregator-inner-concurrent-success=\
|
||||
<?xml version="1.0" encoding="UTF-8"?> \
|
||||
<beans:beans xmlns="http://www.springframework.org/schema/integration" \
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" \
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans \
|
||||
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd \
|
||||
http://www.springframework.org/schema/integration \
|
||||
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> \
|
||||
<channel id="inChannel"> \
|
||||
<queue capacity="5" /> \
|
||||
</channel> \
|
||||
<channel id="outChannel"> \
|
||||
<queue capacity="5" /> \
|
||||
</channel> \
|
||||
<aggregator input-channel="inChannel" method="sum" output-channel="outChannel"> \
|
||||
<poller task-executor="executor" max-messages-per-poll="5"> \
|
||||
<interval-trigger interval="20" /> \
|
||||
</poller> \
|
||||
<beans:bean class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestAggregator"/> \
|
||||
</aggregator> \
|
||||
<thread-pool-task-executor id="executor" core-size="5" /> \
|
||||
</beans:beans>
|
||||
aggregator-failure-refAndBean=\
|
||||
<?xml version="1.0" encoding="UTF-8"?> \
|
||||
<beans:beans xmlns="http://www.springframework.org/schema/integration" \
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" \
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans \
|
||||
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd \
|
||||
http://www.springframework.org/schema/integration \
|
||||
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> \
|
||||
<channel id="inChannel"/> \
|
||||
<channel id="outChannel"> \
|
||||
<queue capacity="5" /> \
|
||||
</channel> \
|
||||
<aggregator input-channel="inChannel" ref="aggregator" method="sum" output-channel="outChannel"> \
|
||||
<beans:bean class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestAggregator"/> \
|
||||
</aggregator> \
|
||||
<beans:bean id="aggregator" class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestAggregator"/> \
|
||||
</beans:beans>
|
||||
Reference in New Issue
Block a user