OPEN - issue BATCH-574: BatchMessageListernerContainer needs to have transaction boundaries defined around batching
Add integration test.
This commit is contained in:
@@ -111,7 +111,7 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain
|
||||
* Extension point for subclasses. Do anything necessary to recover from the
|
||||
* exception, which was raised when the message was being processed.
|
||||
* @param session the current JMS session.
|
||||
* @param message the message just receieved and failed to process.
|
||||
* @param message the message just received and failed to process.
|
||||
* @param ex the exception thrown during message processing.
|
||||
*/
|
||||
protected void recover(Session session, Message message, Throwable ex) throws JMSException {
|
||||
@@ -140,10 +140,34 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain
|
||||
}
|
||||
else if (ex instanceof Error) {
|
||||
// Just re-throw Error instances because otherwise unit tests just
|
||||
// swallow expections from EasyMock and JUnit.
|
||||
// swallow exceptions from EasyMock and JUnit.
|
||||
throw (Error) ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override base class to prevent exceptions from being swallowed.
|
||||
*
|
||||
* @see org.springframework.jms.listener.AbstractMessageListenerContainer#handleListenerException(java.lang.Throwable)
|
||||
*/
|
||||
protected void handleListenerException(Throwable ex) {
|
||||
if (!isSessionTransacted()) {
|
||||
// Log the exceptions in base class if not transactional anyway
|
||||
super.handleListenerException(ex);
|
||||
return;
|
||||
}
|
||||
logger.debug("Re-throwing exception in container.");
|
||||
if (ex instanceof RuntimeException) {
|
||||
// We need to re-throw so that an enclosing non-JMS transaction can
|
||||
// rollback...
|
||||
throw (RuntimeException) ex;
|
||||
}
|
||||
else if (ex instanceof Error) {
|
||||
// Just re-throw Error instances because otherwise unit tests just
|
||||
// swallow exceptions from EasyMock and JUnit.
|
||||
throw (Error) ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override base class method to wrap call in a batch.
|
||||
|
||||
@@ -16,12 +16,14 @@
|
||||
|
||||
package org.springframework.batch.config;
|
||||
|
||||
import org.springframework.batch.jms.ExternalRetryInBatchTests;
|
||||
import org.springframework.test.AbstractTransactionalDataSourceSpringContextTests;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
public class DatasourceTests extends AbstractTransactionalDataSourceSpringContextTests {
|
||||
|
||||
protected String[] getConfigLocations() {
|
||||
return new String[] { "/org/springframework/batch/jms/jms-context.xml" };
|
||||
return new String[] { ClassUtils.addResourcePathToPackagePath(ExternalRetryInBatchTests.class, "jms-context.xml") };
|
||||
}
|
||||
|
||||
public void testTemplate() throws Exception {
|
||||
|
||||
@@ -19,8 +19,10 @@ package org.springframework.batch.config;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.batch.jms.ExternalRetryInBatchTests;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import org.springframework.test.AbstractDependencyInjectionSpringContextTests;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
public class MessagingTests extends AbstractDependencyInjectionSpringContextTests {
|
||||
|
||||
@@ -31,7 +33,8 @@ public class MessagingTests extends AbstractDependencyInjectionSpringContextTest
|
||||
}
|
||||
|
||||
protected String[] getConfigLocations() {
|
||||
return new String[] { "/org/springframework/batch/jms/jms-context.xml" };
|
||||
return new String[] { ClassUtils.addResourcePathToPackagePath(ExternalRetryInBatchTests.class,
|
||||
"jms-context.xml") };
|
||||
}
|
||||
|
||||
protected void onSetUp() throws Exception {
|
||||
|
||||
@@ -0,0 +1,188 @@
|
||||
/*
|
||||
* Copyright 2006-2007 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.batch.container.jms;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.batch.item.AbstractItemWriter;
|
||||
import org.springframework.batch.item.ItemKeyGenerator;
|
||||
import org.springframework.batch.item.ItemRecoverer;
|
||||
import org.springframework.batch.jms.ExternalRetryInBatchTests;
|
||||
import org.springframework.batch.retry.callback.ItemWriterRetryCallback;
|
||||
import org.springframework.batch.retry.policy.ItemWriterRetryPolicy;
|
||||
import org.springframework.batch.retry.policy.NeverRetryPolicy;
|
||||
import org.springframework.batch.retry.support.RetryTemplate;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import org.springframework.test.AbstractDependencyInjectionSpringContextTests;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
*
|
||||
*/
|
||||
public class BatchMessageListenerContainerIntegrationTests extends AbstractDependencyInjectionSpringContextTests {
|
||||
|
||||
private final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
private JmsTemplate jmsTemplate;
|
||||
|
||||
private BatchMessageListenerContainer container;
|
||||
|
||||
private int recovered;
|
||||
|
||||
private int count;
|
||||
|
||||
/**
|
||||
* Public setter for the {@link BatchMessageListenerContainer}.
|
||||
* @param container the container to set
|
||||
*/
|
||||
public void setContainer(BatchMessageListenerContainer container) {
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
/**
|
||||
* Public setter for the JmsTemplate.
|
||||
* @param jmsTemplate the jmsTemplate to set
|
||||
*/
|
||||
public void setJmsTemplate(JmsTemplate jmsTemplate) {
|
||||
this.jmsTemplate = jmsTemplate;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.test.AbstractSingleSpringContextTests#getConfigLocations()
|
||||
*/
|
||||
protected String[] getConfigLocations() {
|
||||
// Share config with other test so that ActiveMQ only starts up once.
|
||||
return new String[] { ClassUtils.addResourcePathToPackagePath(ExternalRetryInBatchTests.class, "jms-context.xml") };
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.springframework.test.AbstractSingleSpringContextTests#onSetUp()
|
||||
*/
|
||||
protected void onSetUp() throws Exception {
|
||||
while(jmsTemplate.receiveAndConvert("queue")!=null) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.test.AbstractSingleSpringContextTests#onTearDown()
|
||||
*/
|
||||
protected void onTearDown() throws Exception {
|
||||
container.stop();
|
||||
while(jmsTemplate.receiveAndConvert("queue")!=null) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
public void testConfiguration() throws Exception {
|
||||
assertNotNull(container);
|
||||
}
|
||||
|
||||
public void testSendAndReceive() throws Exception {
|
||||
container.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message msg) {
|
||||
count++;
|
||||
}
|
||||
});
|
||||
container.start();
|
||||
jmsTemplate.convertAndSend("queue", "foo");
|
||||
jmsTemplate.convertAndSend("queue", "bar");
|
||||
int waiting = 0;
|
||||
while (count < 2 && waiting++ < 10) {
|
||||
Thread.sleep(100L);
|
||||
}
|
||||
if (count < 2) {
|
||||
fail("Expected message to be processed.");
|
||||
}
|
||||
}
|
||||
|
||||
public void testFailureAndRepresent() throws Exception {
|
||||
container.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message msg) {
|
||||
logger.debug("Message: "+msg);
|
||||
count++;
|
||||
throw new RuntimeException("planned failure: " + msg);
|
||||
}
|
||||
});
|
||||
container.start();
|
||||
jmsTemplate.convertAndSend("queue", "foo");
|
||||
int waiting = 0;
|
||||
while (count < 2 && waiting++ < 10) {
|
||||
Thread.sleep(100L);
|
||||
}
|
||||
if (count < 2) {
|
||||
fail("Expected message to be processed twice.");
|
||||
}
|
||||
}
|
||||
|
||||
public void testFailureAndRecovery() throws Exception {
|
||||
final RetryTemplate retryTemplate = new RetryTemplate();
|
||||
retryTemplate.setRetryPolicy(new ItemWriterRetryPolicy(new NeverRetryPolicy()));
|
||||
container.setMessageListener(new MessageListener() {
|
||||
public void onMessage(final Message msg) {
|
||||
try {
|
||||
ItemWriterRetryCallback callback = new ItemWriterRetryCallback(msg, new AbstractItemWriter() {
|
||||
public void write(Object item) throws Exception {
|
||||
logger.debug("Message: "+item);
|
||||
count++;
|
||||
throw new RuntimeException("planned failure: " + msg);
|
||||
}
|
||||
});
|
||||
callback.setKeyGenerator(new ItemKeyGenerator() {
|
||||
public Object getKey(Object item) {
|
||||
String text;
|
||||
try {
|
||||
text = ((TextMessage)item).getJMSMessageID();
|
||||
}
|
||||
catch (JMSException e) {
|
||||
text = ""+item;
|
||||
}
|
||||
logger.debug("Key for message: "+text);
|
||||
return text;
|
||||
}
|
||||
});
|
||||
callback.setRecoverer(new ItemRecoverer() {
|
||||
public boolean recover(Object data, Throwable cause) {
|
||||
recovered++;
|
||||
logger.debug("Recovered: " + data);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
retryTemplate.execute(callback);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw (RuntimeException) e;
|
||||
}
|
||||
}
|
||||
});
|
||||
container.start();
|
||||
jmsTemplate.convertAndSend("queue", "foo");
|
||||
int waiting = 0;
|
||||
while ((count < 1 || recovered < 1) && waiting++ < 1000) {
|
||||
Thread.sleep(100L);
|
||||
}
|
||||
assertEquals(1, count);
|
||||
assertEquals(1, recovered);
|
||||
}
|
||||
}
|
||||
@@ -125,8 +125,13 @@ public class BatchMessageListenerContainerTests extends TestCase {
|
||||
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
|
||||
container = getContainer(template);
|
||||
container.setSessionTransacted(true);
|
||||
boolean received = doTestWithException(new IllegalStateException("No way!"), true, 2);
|
||||
assertFalse("Message received", received);
|
||||
try {
|
||||
boolean received = doTestWithException(new IllegalStateException("No way!"), true, 2);
|
||||
assertFalse("Message received", received);
|
||||
fail("Expected IllegalStateException");
|
||||
} catch (IllegalStateException e) {
|
||||
assertEquals("No way!", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testNonTransactionalReceiveAndExecuteWithCallbackThrowingException() throws Exception {
|
||||
|
||||
@@ -40,6 +40,7 @@ import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
public class ExternalRetryInBatchTests extends AbstractDependencyInjectionSpringContextTests {
|
||||
private JmsTemplate jmsTemplate;
|
||||
@@ -71,7 +72,7 @@ public class ExternalRetryInBatchTests extends AbstractDependencyInjectionSpring
|
||||
}
|
||||
|
||||
protected String[] getConfigLocations() {
|
||||
return new String[] { "/org/springframework/batch/jms/jms-context.xml" };
|
||||
return new String[] { ClassUtils.addResourcePathToPackagePath(getClass(), "jms-context.xml" )};
|
||||
}
|
||||
|
||||
protected void onSetUp() throws Exception {
|
||||
|
||||
@@ -25,15 +25,18 @@ import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.springframework.batch.container.jms.BatchMessageListenerContainer;
|
||||
import org.springframework.batch.jms.ExternalRetryInBatchTests;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import org.springframework.jms.listener.SessionAwareMessageListener;
|
||||
import org.springframework.test.AbstractDependencyInjectionSpringContextTests;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
public class AsynchronousTests extends AbstractDependencyInjectionSpringContextTests {
|
||||
|
||||
protected String[] getConfigLocations() {
|
||||
return new String[] { "/org/springframework/batch/jms/jms-context.xml" };
|
||||
return new String[] { ClassUtils.addResourcePathToPackagePath(ExternalRetryInBatchTests.class,
|
||||
"jms-context.xml") };
|
||||
}
|
||||
|
||||
private BatchMessageListenerContainer container;
|
||||
|
||||
@@ -23,6 +23,7 @@ import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.springframework.batch.jms.ExternalRetryInBatchTests;
|
||||
import org.springframework.batch.repeat.ExitStatus;
|
||||
import org.springframework.batch.repeat.RepeatCallback;
|
||||
import org.springframework.batch.repeat.RepeatContext;
|
||||
@@ -33,6 +34,7 @@ import org.springframework.jms.core.SessionCallback;
|
||||
import org.springframework.test.AbstractTransactionalDataSourceSpringContextTests;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
public class SynchronousTests extends AbstractTransactionalDataSourceSpringContextTests {
|
||||
|
||||
@@ -49,7 +51,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
}
|
||||
|
||||
protected String[] getConfigLocations() {
|
||||
return new String[] { "/org/springframework/batch/jms/jms-context.xml" };
|
||||
return new String[] { ClassUtils.addResourcePathToPackagePath(ExternalRetryInBatchTests.class,
|
||||
"jms-context.xml") };
|
||||
}
|
||||
|
||||
protected void onSetUpBeforeTransaction() throws Exception {
|
||||
@@ -85,7 +88,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
list.add(text);
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
return new ExitStatus(text != null);
|
||||
}
|
||||
});
|
||||
@@ -116,7 +119,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
list.add(text);
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
return new ExitStatus(text != null);
|
||||
}
|
||||
});
|
||||
@@ -146,7 +149,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
// The JmsTemplate is used elsewhere outside a transaction, so
|
||||
// we need to use one here that is transaction aware.
|
||||
final JmsTemplate jmsTemplate = new JmsTemplate((ConnectionFactory) applicationContext
|
||||
.getBean("txAwareConnectionFactory"));
|
||||
.getBean("txAwareConnectionFactory"));
|
||||
jmsTemplate.setReceiveTimeout(100L);
|
||||
jmsTemplate.setSessionTransacted(true);
|
||||
|
||||
@@ -156,7 +159,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
list.add(text);
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
return new ExitStatus(text != null);
|
||||
}
|
||||
});
|
||||
@@ -170,9 +173,11 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
try {
|
||||
assertTrue("Not a SessionProxy - wrong spring version?", session instanceof SessionProxy);
|
||||
((SessionProxy) session).getTargetSession().rollback();
|
||||
} catch (JMSException e) {
|
||||
}
|
||||
catch (JMSException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
// swallow it
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import javax.sql.DataSource;
|
||||
import org.springframework.batch.item.AbstractItemReader;
|
||||
import org.springframework.batch.item.AbstractItemWriter;
|
||||
import org.springframework.batch.item.ItemRecoverer;
|
||||
import org.springframework.batch.jms.ExternalRetryInBatchTests;
|
||||
import org.springframework.batch.retry.callback.ItemWriterRetryCallback;
|
||||
import org.springframework.batch.retry.policy.ItemWriterRetryPolicy;
|
||||
import org.springframework.batch.retry.support.RetryTemplate;
|
||||
@@ -34,6 +35,7 @@ import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
public class ExternalRetryTests extends AbstractDependencyInjectionSpringContextTests {
|
||||
|
||||
@@ -60,7 +62,8 @@ public class ExternalRetryTests extends AbstractDependencyInjectionSpringContext
|
||||
}
|
||||
|
||||
protected String[] getConfigLocations() {
|
||||
return new String[] { "/org/springframework/batch/jms/jms-context.xml" };
|
||||
return new String[] { ClassUtils.addResourcePathToPackagePath(ExternalRetryInBatchTests.class,
|
||||
"jms-context.xml") };
|
||||
}
|
||||
|
||||
protected void onSetUp() throws Exception {
|
||||
@@ -93,7 +96,8 @@ public class ExternalRetryTests extends AbstractDependencyInjectionSpringContext
|
||||
private List recovered = new ArrayList();
|
||||
|
||||
/**
|
||||
* Message processing is successful on the second attempt but must receive the message again.
|
||||
* Message processing is successful on the second attempt but must receive
|
||||
* the message again.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@@ -106,7 +110,7 @@ public class ExternalRetryTests extends AbstractDependencyInjectionSpringContext
|
||||
final AbstractItemWriter writer = new AbstractItemWriter() {
|
||||
public void write(final Object text) {
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
if (list.size() == 1) {
|
||||
throw new RuntimeException("Rollback!");
|
||||
}
|
||||
@@ -120,13 +124,15 @@ public class ExternalRetryTests extends AbstractDependencyInjectionSpringContext
|
||||
try {
|
||||
ItemWriterRetryCallback callback = new ItemWriterRetryCallback(provider.read(), writer);
|
||||
return retryTemplate.execute(callback);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
fail("Expected Exception");
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
||||
assertEquals("Rollback!", e.getMessage());
|
||||
|
||||
@@ -140,7 +146,8 @@ public class ExternalRetryTests extends AbstractDependencyInjectionSpringContext
|
||||
try {
|
||||
ItemWriterRetryCallback callback = new ItemWriterRetryCallback(provider.read(), writer);
|
||||
return retryTemplate.execute(callback);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
@@ -170,7 +177,7 @@ public class ExternalRetryTests extends AbstractDependencyInjectionSpringContext
|
||||
final ItemWriterRetryCallback callback = new ItemWriterRetryCallback(provider.read(), new AbstractItemWriter() {
|
||||
public void write(final Object text) {
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
throw new RuntimeException("Rollback!");
|
||||
}
|
||||
});
|
||||
@@ -184,12 +191,14 @@ public class ExternalRetryTests extends AbstractDependencyInjectionSpringContext
|
||||
public Object doInTransaction(TransactionStatus status) {
|
||||
try {
|
||||
return retryTemplate.execute(callback);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
||||
if (i < 3)
|
||||
assertEquals("Rollback!", e.getMessage());
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.util.List;
|
||||
|
||||
import org.springframework.batch.item.AbstractItemWriter;
|
||||
import org.springframework.batch.item.jms.JmsItemReader;
|
||||
import org.springframework.batch.jms.ExternalRetryInBatchTests;
|
||||
import org.springframework.batch.retry.RetryCallback;
|
||||
import org.springframework.batch.retry.RetryContext;
|
||||
import org.springframework.batch.retry.callback.ItemWriterRetryCallback;
|
||||
@@ -30,6 +31,7 @@ import org.springframework.test.AbstractTransactionalDataSourceSpringContextTest
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
public class SynchronousTests extends AbstractTransactionalDataSourceSpringContextTests {
|
||||
|
||||
@@ -42,7 +44,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
}
|
||||
|
||||
protected String[] getConfigLocations() {
|
||||
return new String[] { "/org/springframework/batch/jms/jms-context.xml" };
|
||||
return new String[] { ClassUtils.addResourcePathToPackagePath(ExternalRetryInBatchTests.class,
|
||||
"jms-context.xml") };
|
||||
}
|
||||
|
||||
protected void onSetUpBeforeTransaction() throws Exception {
|
||||
@@ -73,7 +76,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
List list = new ArrayList();
|
||||
|
||||
/**
|
||||
* Message processing is successful on the second attempt without having to receive the message again.
|
||||
* Message processing is successful on the second attempt without having to
|
||||
* receive the message again.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@@ -82,9 +86,11 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
assertInitialState();
|
||||
|
||||
/*
|
||||
* We either want the JMS receive to be outside a transaction, or we need the database transaction in the retry
|
||||
* to be PROPAGATION_NESTED. Otherwise JMS will roll back when the retry callback is eventually successful
|
||||
* because of the previous exception. PROPAGATION_REQUIRES_NEW is wrong because it doesn't allow the outer
|
||||
* We either want the JMS receive to be outside a transaction, or we
|
||||
* need the database transaction in the retry to be PROPAGATION_NESTED.
|
||||
* Otherwise JMS will roll back when the retry callback is eventually
|
||||
* successful because of the previous exception.
|
||||
* PROPAGATION_REQUIRES_NEW is wrong because it doesn't allow the outer
|
||||
* transaction to fail and rollback the inner one.
|
||||
*/
|
||||
final String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
@@ -101,7 +107,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
list.add(text);
|
||||
System.err.println("Inserting: [" + list.size() + "," + text + "]");
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
if (list.size() == 1) {
|
||||
throw new RuntimeException("Rollback!");
|
||||
}
|
||||
@@ -130,8 +136,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
}
|
||||
|
||||
/**
|
||||
* Message processing is successful on the second attempt without having to receive the message again - uses
|
||||
* JmsItemProvider internally.
|
||||
* Message processing is successful on the second attempt without having to
|
||||
* receive the message again - uses JmsItemProvider internally.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@@ -155,7 +161,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
list.add(text);
|
||||
System.err.println("Inserting: [" + list.size() + "," + text + "]");
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
if (list.size() == 1) {
|
||||
throw new RuntimeException("Rollback!");
|
||||
}
|
||||
@@ -185,7 +191,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
}
|
||||
|
||||
/**
|
||||
* Message processing is successful on the second attempt without having to receive the message again.
|
||||
* Message processing is successful on the second attempt without having to
|
||||
* receive the message again.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@@ -194,9 +201,11 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
assertInitialState();
|
||||
|
||||
/*
|
||||
* We either want the JMS receive to be outside a transaction, or we need the database transaction in the retry
|
||||
* to be PROPAGATION_NESTED. Otherwise JMS will roll back when the retry callback is eventually successful
|
||||
* because of the previous exception. PROPAGATION_REQUIRES_NEW is wrong because it doesn't allow the outer
|
||||
* We either want the JMS receive to be outside a transaction, or we
|
||||
* need the database transaction in the retry to be PROPAGATION_NESTED.
|
||||
* Otherwise JMS will roll back when the retry callback is eventually
|
||||
* successful because of the previous exception.
|
||||
* PROPAGATION_REQUIRES_NEW is wrong because it doesn't allow the outer
|
||||
* transaction to fail and rollback the inner one.
|
||||
*/
|
||||
final String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
@@ -212,7 +221,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
list.add(text);
|
||||
System.err.println("Inserting: [" + list.size() + "," + text + "]");
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
return text;
|
||||
|
||||
}
|
||||
@@ -241,7 +250,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
}
|
||||
|
||||
/**
|
||||
* Message processing is successful on the second attempt but must receive the message again.
|
||||
* Message processing is successful on the second attempt but must receive
|
||||
* the message again.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@@ -266,7 +276,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
final String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
list.add(text);
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
if (list.size() == 1) {
|
||||
throw new RuntimeException("Rollback!");
|
||||
}
|
||||
@@ -318,7 +328,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
final String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
list.add(text);
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)",
|
||||
new Object[] { new Integer(list.size()), text });
|
||||
new Object[] { new Integer(list.size()), text });
|
||||
throw new RuntimeException("Rollback!");
|
||||
|
||||
}
|
||||
@@ -328,12 +338,13 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
});
|
||||
|
||||
/*
|
||||
* N.B. the message can be re-directed to an error queue by setting an error destination in a
|
||||
* JmsItemProvider.
|
||||
* N.B. the message can be re-directed to an error queue by setting
|
||||
* an error destination in a JmsItemProvider.
|
||||
*/
|
||||
fail("Expected RuntimeException");
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
assertEquals("Rollback!", e.getMessage());
|
||||
// expected
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.util.List;
|
||||
|
||||
import org.springframework.batch.item.AbstractItemWriter;
|
||||
import org.springframework.batch.item.jms.JmsItemReader;
|
||||
import org.springframework.batch.jms.ExternalRetryInBatchTests;
|
||||
import org.springframework.batch.retry.RetryCallback;
|
||||
import org.springframework.batch.retry.RetryContext;
|
||||
import org.springframework.batch.retry.callback.ItemWriterRetryCallback;
|
||||
@@ -30,6 +31,7 @@ import org.springframework.test.AbstractTransactionalDataSourceSpringContextTest
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
public class SynchronousTests extends AbstractTransactionalDataSourceSpringContextTests {
|
||||
|
||||
@@ -42,7 +44,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
}
|
||||
|
||||
protected String[] getConfigLocations() {
|
||||
return new String[] { "/org/springframework/batch/jms/jms-context.xml" };
|
||||
return new String[] { ClassUtils.addResourcePathToPackagePath(ExternalRetryInBatchTests.class,
|
||||
"jms-context.xml") };
|
||||
}
|
||||
|
||||
protected void onSetUpBeforeTransaction() throws Exception {
|
||||
@@ -70,7 +73,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
List list = new ArrayList();
|
||||
|
||||
/**
|
||||
* Message processing is successful on the second attempt without having to receive the message again.
|
||||
* Message processing is successful on the second attempt without having to
|
||||
* receive the message again.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@@ -79,9 +83,11 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
assertInitialState();
|
||||
|
||||
/*
|
||||
* We either want the JMS receive to be outside a transaction, or we need the database transaction in the retry
|
||||
* to be PROPAGATION_NESTED. Otherwise JMS will roll back when the retry callback is eventually successful
|
||||
* because of the previous exception. PROPAGATION_REQUIRES_NEW is wrong because it doesn't allow the outer
|
||||
* We either want the JMS receive to be outside a transaction, or we
|
||||
* need the database transaction in the retry to be PROPAGATION_NESTED.
|
||||
* Otherwise JMS will roll back when the retry callback is eventually
|
||||
* successful because of the previous exception.
|
||||
* PROPAGATION_REQUIRES_NEW is wrong because it doesn't allow the outer
|
||||
* transaction to fail and rollback the inner one.
|
||||
*/
|
||||
final String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
@@ -97,7 +103,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
list.add(text);
|
||||
System.err.println("Inserting: [" + list.size() + "," + text + "]");
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
if (list.size() == 1) {
|
||||
throw new RuntimeException("Rollback!");
|
||||
}
|
||||
@@ -126,8 +132,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
}
|
||||
|
||||
/**
|
||||
* Message processing is successful on the second attempt without having to receive the message again - uses
|
||||
* JmsItemProvider internally.
|
||||
* Message processing is successful on the second attempt without having to
|
||||
* receive the message again - uses JmsItemProvider internally.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@@ -151,7 +157,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
list.add(text);
|
||||
System.err.println("Inserting: [" + list.size() + "," + text + "]");
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
if (list.size() == 1) {
|
||||
throw new RuntimeException("Rollback!");
|
||||
}
|
||||
@@ -181,7 +187,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
}
|
||||
|
||||
/**
|
||||
* Message processing is successful on the second attempt without having to receive the message again.
|
||||
* Message processing is successful on the second attempt without having to
|
||||
* receive the message again.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@@ -190,9 +197,11 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
assertInitialState();
|
||||
|
||||
/*
|
||||
* We either want the JMS receive to be outside a transaction, or we need the database transaction in the retry
|
||||
* to be PROPAGATION_NESTED. Otherwise JMS will roll back when the retry callback is eventually successful
|
||||
* because of the previous exception. PROPAGATION_REQUIRES_NEW is wrong because it doesn't allow the outer
|
||||
* We either want the JMS receive to be outside a transaction, or we
|
||||
* need the database transaction in the retry to be PROPAGATION_NESTED.
|
||||
* Otherwise JMS will roll back when the retry callback is eventually
|
||||
* successful because of the previous exception.
|
||||
* PROPAGATION_REQUIRES_NEW is wrong because it doesn't allow the outer
|
||||
* transaction to fail and rollback the inner one.
|
||||
*/
|
||||
final String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
@@ -208,7 +217,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
list.add(text);
|
||||
System.err.println("Inserting: [" + list.size() + "," + text + "]");
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
return text;
|
||||
|
||||
}
|
||||
@@ -237,7 +246,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
}
|
||||
|
||||
/**
|
||||
* Message processing is successful on the second attempt but must receive the message again.
|
||||
* Message processing is successful on the second attempt but must receive
|
||||
* the message again.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@@ -262,7 +272,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
final String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
list.add(text);
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
new Integer(list.size()), text });
|
||||
if (list.size() == 1) {
|
||||
throw new RuntimeException("Rollback!");
|
||||
}
|
||||
@@ -314,7 +324,7 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
final String text = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
list.add(text);
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)",
|
||||
new Object[] { new Integer(list.size()), text });
|
||||
new Object[] { new Integer(list.size()), text });
|
||||
throw new RuntimeException("Rollback!");
|
||||
|
||||
}
|
||||
@@ -325,7 +335,8 @@ public class SynchronousTests extends AbstractTransactionalDataSourceSpringConte
|
||||
|
||||
fail("Expected RuntimeException");
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
assertEquals("Rollback!", e.getMessage());
|
||||
// expected
|
||||
}
|
||||
|
||||
@@ -2,7 +2,8 @@ log4j.rootCategory=INFO, stdout
|
||||
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - <%m>%n
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{1}:%L - %m%n
|
||||
|
||||
log4j.category.org.apache.activemq=ERROR
|
||||
# log4j.category.org.springframework=DEBUG
|
||||
log4j.category.org.springframework.batch.container.jms=DEBUG
|
||||
|
||||
@@ -52,6 +52,7 @@
|
||||
<bean id="container"
|
||||
class="org.springframework.batch.container.jms.BatchMessageListenerContainer">
|
||||
<property name="transactionManager" ref="transactionManager" />
|
||||
<property name="recoveryInterval" value="0" />
|
||||
<property name="connectionFactory"
|
||||
ref="txAwareConnectionFactory" />
|
||||
<property name="destinationName" value="queue" />
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beansProjectDescription>
|
||||
<version>1</version>
|
||||
<pluginVersion><![CDATA[2.0.2.v200712142013]]></pluginVersion>
|
||||
<pluginVersion><![CDATA[2.0.3.v200802061800]]></pluginVersion>
|
||||
<configSuffixes>
|
||||
<configSuffix><![CDATA[xml]]></configSuffix>
|
||||
</configSuffixes>
|
||||
<enableImports><![CDATA[false]]></enableImports>
|
||||
<configs>
|
||||
<config>src/main/resources/jobs/fixedLengthImportJob.xml</config>
|
||||
<config>src/main/resources/jobs/multilineJob.xml</config>
|
||||
|
||||
Reference in New Issue
Block a user