Added AbstractInOutEndpoint and ServiceActivatorEndpoint. SplitterEndpoint now extends AbstractInOutEndpoint as well.

This commit is contained in:
Mark Fisher
2008-08-30 17:07:58 +00:00
parent b009bb137c
commit 41b3a764ab
18 changed files with 497 additions and 222 deletions

View File

@@ -47,7 +47,7 @@ public class TestHandler implements MessageHandler {
this.replyMessageText = replyMessageText;
}
public Message handle(Message message) {
public Message<?> handle(Message<?> message) {
this.messageString = message.getPayload().toString();
this.latch.countDown();
return (this.replyMessageText != null) ? new StringMessage(this.replyMessageText) : null;

View File

@@ -17,7 +17,7 @@
input-channel="testChannel"
ref="testHandler"
output-channel="replyChannel">
<poller period="100"/>
<poller period="10000" max-messages-per-poll="1"/>
<interceptors>
<beans:bean class="org.springframework.integration.config.TestPreHandleInterceptor"/>
<beans:bean class="org.springframework.integration.config.TestPostHandleInterceptor"/>
@@ -28,7 +28,7 @@
input-channel="testChannel"
ref="testHandler"
output-channel="replyChannel">
<poller period="100"/>
<poller period="10000" max-messages-per-poll="1"/>
<interceptors>
<ref bean="preInterceptor"/>
<ref bean="postInterceptor"/>
@@ -36,7 +36,9 @@
</service-activator>
<beans:bean id="testHandler" class="org.springframework.integration.config.TestHandler"/>
<beans:bean id="testHandler" class="org.springframework.integration.config.TestHandler">
<beans:property name="replyMessageText" value="test"/>
</beans:bean>
<beans:bean id="preInterceptor" class="org.springframework.integration.config.TestPreHandleInterceptor"/>

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.integration.endpoint.interceptor;
package org.springframework.integration.dispatcher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -25,25 +25,20 @@ import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.StringMessage;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
/**
* @author Mark Fisher
*/
public class TransactionInterceptorTests {
public class PollingTransactionTests {
@Test
public void testTransactionInterceptorWithCommit() throws InterruptedException {
public void transactionWithCommit() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"transactionInterceptorTests.xml", this.getClass());
"transactionTests.xml", this.getClass());
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
MessageChannel input = (MessageChannel) context.getBean("goodInput");
PollableChannel output = (PollableChannel) context.getBean("output");
@@ -51,16 +46,16 @@ public class TransactionInterceptorTests {
assertEquals(0, txManager.getRollbackCount());
input.send(new StringMessage("test"));
txManager.waitForCompletion(1000);
Message<?> message = output.receive(500);
Message<?> message = output.receive(0);
assertNotNull(message);
assertEquals(1, txManager.getCommitCount());
assertEquals(0, txManager.getRollbackCount());
}
@Test
public void testTransactionInterceptorWithRollback() throws InterruptedException {
public void transactionWithRollback() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"transactionInterceptorTests.xml", this.getClass());
"transactionTests.xml", this.getClass());
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
MessageChannel input = (MessageChannel) context.getBean("badInput");
PollableChannel output = (PollableChannel) context.getBean("output");
@@ -68,105 +63,90 @@ public class TransactionInterceptorTests {
assertEquals(0, txManager.getRollbackCount());
input.send(new StringMessage("test"));
txManager.waitForCompletion(1000);
Message<?> message = output.receive(500);
Message<?> message = output.receive(0);
assertNull(message);
assertEquals(0, txManager.getCommitCount());
assertEquals(1, txManager.getRollbackCount());
}
@Test
public void testPropagationRequired() {
public void propagationRequired() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"transactionInterceptorPropagationTests.xml", this.getClass());
"propagationRequiredTests.xml", this.getClass());
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
final MessageEndpoint endpoint = (MessageEndpoint) context.getBean("required");
PollableChannel input = (PollableChannel) context.getBean("input");
PollableChannel output = (PollableChannel) context.getBean("output");
assertEquals(0, txManager.getCommitCount());
endpoint.send(new StringMessage("test"));
input.send(new StringMessage("test"));
Message<?> reply = output.receive(3000);
assertNotNull(reply);
txManager.waitForCompletion(3000);
assertEquals(1, txManager.getCommitCount());
TestTransactionManager outerTxManager = new TestTransactionManager();
TransactionTemplate txTemplate = new TransactionTemplate(outerTxManager);
txTemplate.execute(new TransactionCallback() {
public Object doInTransaction(TransactionStatus status) {
return endpoint.send(new StringMessage("test"));
}
});
assertEquals(1, outerTxManager.getCommitCount());
assertEquals(2, txManager.getCommitCount());
assertEquals(Propagation.REQUIRED.value(), txManager.getLastDefinition().getPropagationBehavior());
}
@Test
public void testPropagationRequiresNew() {
public void propagationRequiresNew() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"transactionInterceptorPropagationTests.xml", this.getClass());
"propagationRequiresNewTests.xml", this.getClass());
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
final MessageEndpoint endpoint = (MessageEndpoint) context.getBean("requiresNew");
PollableChannel input = (PollableChannel) context.getBean("input");
PollableChannel output = (PollableChannel) context.getBean("output");
assertEquals(0, txManager.getCommitCount());
endpoint.send(new StringMessage("test"));
input.send(new StringMessage("test"));
Message<?> reply = output.receive(3000);
assertNotNull(reply);
txManager.waitForCompletion(3000);
assertEquals(1, txManager.getCommitCount());
TestTransactionManager outerTxManager = new TestTransactionManager();
TransactionTemplate txTemplate = new TransactionTemplate(outerTxManager);
txTemplate.execute(new TransactionCallback() {
public Object doInTransaction(TransactionStatus status) {
return endpoint.send(new StringMessage("test"));
}
});
assertEquals(1, outerTxManager.getCommitCount());
assertEquals(2, txManager.getCommitCount());
assertEquals(Propagation.REQUIRES_NEW.value(), txManager.getLastDefinition().getPropagationBehavior());
}
@Test
public void testPropagationSupports() {
public void propagationSupports() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"transactionInterceptorPropagationTests.xml", this.getClass());
"propagationSupportsTests.xml", this.getClass());
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
final MessageEndpoint endpoint = (MessageEndpoint) context.getBean("supports");
PollableChannel input = (PollableChannel) context.getBean("input");
PollableChannel output = (PollableChannel) context.getBean("output");
assertEquals(0, txManager.getCommitCount());
endpoint.send(new StringMessage("test"));
input.send(new StringMessage("test"));
Message<?> reply = output.receive(3000);
assertNotNull(reply);
assertEquals(0, txManager.getCommitCount());
TestTransactionManager outerTxManager = new TestTransactionManager();
TransactionTemplate txTemplate = new TransactionTemplate(outerTxManager);
txTemplate.execute(new TransactionCallback() {
public Object doInTransaction(TransactionStatus status) {
return endpoint.send(new StringMessage("test"));
}
});
assertEquals(0, txManager.getCommitCount());
assertEquals(1, outerTxManager.getCommitCount());
assertNull(txManager.getLastDefinition());
}
@Test
public void testPropagationNotSupported() {
public void propagationNotSupported() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"transactionInterceptorPropagationTests.xml", this.getClass());
"propagationNotSupportedTests.xml", this.getClass());
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
final MessageEndpoint endpoint = (MessageEndpoint) context.getBean("notSupported");
PollableChannel input = (PollableChannel) context.getBean("input");
PollableChannel output = (PollableChannel) context.getBean("output");
assertEquals(0, txManager.getCommitCount());
endpoint.send(new StringMessage("test"));
input.send(new StringMessage("test"));
Message<?> reply = output.receive(3000);
assertNotNull(reply);
assertEquals(0, txManager.getCommitCount());
TestTransactionManager outerTxManager = new TestTransactionManager();
TransactionTemplate txTemplate = new TransactionTemplate(outerTxManager);
txTemplate.execute(new TransactionCallback() {
public Object doInTransaction(TransactionStatus status) {
return endpoint.send(new StringMessage("test"));
}
});
assertEquals(0, txManager.getCommitCount());
assertEquals(1, outerTxManager.getCommitCount());
assertNull(txManager.getLastDefinition());
}
@Test(expected = IllegalTransactionStateException.class)
public void testPropagationMandatoryCalledWithoutTransaction() throws Throwable {
@Test
public void propagationMandatory() throws Throwable {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"transactionInterceptorPropagationTests.xml", this.getClass());
final MessageEndpoint endpoint = (MessageEndpoint) context.getBean("mandatory");
try {
endpoint.send(new StringMessage("test"));
}
catch (MessageHandlingException e) {
throw e.getCause();
}
"propagationMandatoryTests.xml", this.getClass());
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
PollableChannel input = (PollableChannel) context.getBean("input");
PollableChannel output = (PollableChannel) context.getBean("output");
PollableChannel errorChannel = (PollableChannel) context.getBean("errorChannel");
assertEquals(0, txManager.getCommitCount());
input.send(new StringMessage("test"));
Message<?> errorMessage = errorChannel.receive(3000);
assertNotNull(errorMessage);
Object payload = errorMessage.getPayload();
assertEquals(IllegalTransactionStateException.class, payload.getClass());
assertNull(output.receive(0));
assertEquals(0, txManager.getCommitCount());
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.integration.endpoint.interceptor;
package org.springframework.integration.dispatcher;
/**
* @author Mark Fisher

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.integration.endpoint.interceptor;
package org.springframework.integration.dispatcher;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

View File

@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<channel id="errorChannel"/>
<service-activator id="mandatory"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000" max-messages-per-poll="1">
<transactional transaction-manager="txManager" propagation="MANDATORY"/>
</poller>
</service-activator>
<beans:bean id="testBean" class="org.springframework.integration.dispatcher.TestBean"/>
<beans:bean id="txManager" class="org.springframework.integration.dispatcher.TestTransactionManager"/>
</beans:beans>

View File

@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<service-activator id="notSupported"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000" max-messages-per-poll="1">
<transactional transaction-manager="txManager" propagation="NOT_SUPPORTED"/>
</poller>
</service-activator>
<beans:bean id="testBean" class="org.springframework.integration.dispatcher.TestBean"/>
<beans:bean id="txManager" class="org.springframework.integration.dispatcher.TestTransactionManager"/>
</beans:beans>

View File

@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<service-activator id="required"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000" max-messages-per-poll="1">
<transactional transaction-manager="txManager" propagation="REQUIRED"/>
</poller>
</service-activator>
<beans:bean id="testBean" class="org.springframework.integration.dispatcher.TestBean"/>
<beans:bean id="txManager" class="org.springframework.integration.dispatcher.TestTransactionManager"/>
</beans:beans>

View File

@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<service-activator id="requiresNew"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000" max-messages-per-poll="1">
<transactional transaction-manager="txManager" propagation="REQUIRES_NEW"/>
</poller>
</service-activator>
<beans:bean id="testBean" class="org.springframework.integration.dispatcher.TestBean"/>
<beans:bean id="txManager" class="org.springframework.integration.dispatcher.TestTransactionManager"/>
</beans:beans>

View File

@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<service-activator id="supports"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000" max-messages-per-poll="1">
<transactional transaction-manager="txManager" propagation="SUPPORTS"/>
</poller>
</service-activator>
<beans:bean id="testBean" class="org.springframework.integration.dispatcher.TestBean"/>
<beans:bean id="txManager" class="org.springframework.integration.dispatcher.TestTransactionManager"/>
</beans:beans>

View File

@@ -17,24 +17,22 @@
ref="testBean"
method="bad"
output-channel="output">
<poller period="100"/>
<interceptors>
<transaction-interceptor transaction-manager="txManager"/>
</interceptors>
<poller period="10000" max-messages-per-poll="1">
<transactional transaction-manager="txManager"/>
</poller>
</service-activator>
<service-activator input-channel="goodInput"
ref="testBean"
method="good"
output-channel="output">
<poller period="100"/>
<interceptors>
<transaction-interceptor transaction-manager="txManager" propagation="REQUIRED"/>
</interceptors>
<poller period="10000" max-messages-per-poll="1">
<transactional transaction-manager="txManager" propagation="REQUIRED"/>
</poller>
</service-activator>
<beans:bean id="testBean" class="org.springframework.integration.endpoint.interceptor.TestBean"/>
<beans:bean id="testBean" class="org.springframework.integration.dispatcher.TestBean"/>
<beans:bean id="txManager" class="org.springframework.integration.endpoint.interceptor.TestTransactionManager"/>
<beans:bean id="txManager" class="org.springframework.integration.dispatcher.TestTransactionManager"/>
</beans:beans>

View File

@@ -1,74 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus/>
<channel id="input"/>
<channel id="output"/>
<service-activator id="required"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000"/>
<interceptors>
<transaction-interceptor transaction-manager="txManager" propagation="REQUIRED"/>
</interceptors>
</service-activator>
<service-activator id="requiresNew"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000"/>
<interceptors>
<transaction-interceptor transaction-manager="txManager" propagation="REQUIRES_NEW"/>
</interceptors>
</service-activator>
<service-activator id="supports"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000"/>
<interceptors>
<transaction-interceptor transaction-manager="txManager" propagation="SUPPORTS"/>
</interceptors>
</service-activator>
<service-activator id="notSupported"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000"/>
<interceptors>
<transaction-interceptor transaction-manager="txManager" propagation="NOT_SUPPORTED"/>
</interceptors>
</service-activator>
<service-activator id="mandatory"
input-channel="input"
ref="testBean"
method="good"
output-channel="output">
<poller period="10000"/>
<interceptors>
<transaction-interceptor transaction-manager="txManager" propagation="MANDATORY"/>
</interceptors>
</service-activator>
<beans:bean id="testBean" class="org.springframework.integration.endpoint.interceptor.TestBean"/>
<beans:bean id="txManager" class="org.springframework.integration.endpoint.interceptor.TestTransactionManager"/>
</beans:beans>