OPEN - issue BATCH-574: BatchMessageListernerContainer needs to have transaction boundaries defined around batching
Remove unnecessary clutter from BatchMessageListernerContainer (it's only really necessary to override two methods)
This commit is contained in:
@@ -17,14 +17,13 @@
|
||||
package org.springframework.batch.container.jms;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.springframework.batch.repeat.ExitStatus;
|
||||
import org.springframework.batch.repeat.RepeatCallback;
|
||||
import org.springframework.batch.repeat.RepeatContext;
|
||||
import org.springframework.batch.repeat.RepeatOperations;
|
||||
import org.springframework.batch.repeat.ExitStatus;
|
||||
import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy;
|
||||
import org.springframework.jms.listener.DefaultMessageListenerContainer;
|
||||
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
|
||||
@@ -44,8 +43,6 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain
|
||||
|
||||
private RepeatOperations template;
|
||||
|
||||
private ThreadLocal messageHolder = new ThreadLocal();
|
||||
|
||||
/**
|
||||
* Create a new {@link BatchMessageListenerContainer}. The container is set
|
||||
* with auto startup = false (not the default of the parent container).
|
||||
@@ -63,88 +60,6 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain
|
||||
setMessageListener(new MessageListenerAdapter());
|
||||
}
|
||||
|
||||
/**
|
||||
* Override base class method to store message in a thread local for later
|
||||
* use.
|
||||
*
|
||||
* @see org.springframework.jms.listener.AbstractPollingMessageListenerContainer#receiveMessage(javax.jms.MessageConsumer)
|
||||
*/
|
||||
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
|
||||
Message message = super.receiveMessage(consumer);
|
||||
if (message!=null) {
|
||||
messageHolder.set(message);
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override base class method to enable the message holder to be reset,
|
||||
* signalling that a rollback has occurred.
|
||||
*
|
||||
* @see org.springframework.jms.listener.AbstractMessageListenerContainer#rollbackOnExceptionIfNecessary(javax.jms.Session,
|
||||
* java.lang.Throwable)
|
||||
*/
|
||||
protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) throws JMSException {
|
||||
super.rollbackOnExceptionIfNecessary(session, ex);
|
||||
if (session.getTransacted() && isSessionTransacted()) {
|
||||
messageHolder.set(null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override base class to allow extra processing in the case of exception,
|
||||
* with knowledge of the message.
|
||||
*
|
||||
* @see org.springframework.jms.listener.AbstractMessageListenerContainer#doExecuteListener(javax.jms.Session,
|
||||
* javax.jms.Message)
|
||||
*/
|
||||
protected void doExecuteListener(Session session, Message message) throws JMSException {
|
||||
try {
|
||||
super.doExecuteListener(session, message);
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
handleListenerException(session, message, ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension point for subclasses. Do anything necessary to recover from the
|
||||
* exception, which was raised when the message was being processed.
|
||||
* @param session the current JMS session.
|
||||
* @param message the message just received and failed to process.
|
||||
* @param ex the exception thrown during message processing.
|
||||
*/
|
||||
protected void recover(Session session, Message message, Throwable ex) throws JMSException {
|
||||
// do nothing...
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to provide a recovery path - delegates to
|
||||
* {@link #recover(Session, Message, Throwable)}.
|
||||
*
|
||||
* TODO: Could be merged into base class?
|
||||
*
|
||||
* @param session the JMS session
|
||||
* @param message the last message
|
||||
* @param ex the exception thrown by listener
|
||||
* @see #doExecuteListener(Session, Message)
|
||||
* @see #recover(Session, Message, Throwable)
|
||||
*/
|
||||
protected final void handleListenerException(Session session, Message message, Throwable ex) throws JMSException {
|
||||
// Call out to recovery path...
|
||||
recover(session, message, ex);
|
||||
if (ex instanceof RuntimeException) {
|
||||
// We need to rethrow so that an enclosing non-JMS transaction can
|
||||
// rollback...
|
||||
throw (RuntimeException) ex;
|
||||
}
|
||||
else if (ex instanceof Error) {
|
||||
// Just re-throw Error instances because otherwise unit tests just
|
||||
// swallow exceptions from EasyMock and JUnit.
|
||||
throw (Error) ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override base class to prevent exceptions from being swallowed.
|
||||
*
|
||||
@@ -176,18 +91,13 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain
|
||||
*/
|
||||
protected boolean receiveAndExecute(final Session session, final MessageConsumer consumer) throws JMSException {
|
||||
|
||||
template.iterate(new RepeatCallback() {
|
||||
ExitStatus status = template.iterate(new RepeatCallback() {
|
||||
public ExitStatus doInIteration(RepeatContext context) throws Exception {
|
||||
return doBatchCallBack(session, consumer);
|
||||
}
|
||||
});
|
||||
|
||||
if (messageHolder.get()==null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
messageHolder.set(null);
|
||||
return true;
|
||||
return status.isContinuable();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -219,8 +129,7 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain
|
||||
* be rolled back when a batch fails.
|
||||
*/
|
||||
if (super.receiveAndExecute(session, consumer)) {
|
||||
Object message = messageHolder.get();
|
||||
return new ExitStatus(message!=null);
|
||||
return ExitStatus.CONTINUABLE;
|
||||
}
|
||||
return ExitStatus.FINISHED;
|
||||
}
|
||||
|
||||
@@ -20,8 +20,6 @@ import javax.jms.Message;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.batch.item.AbstractItemWriter;
|
||||
import org.springframework.batch.item.ItemKeyGenerator;
|
||||
import org.springframework.batch.item.ItemRecoverer;
|
||||
@@ -40,8 +38,6 @@ import org.springframework.util.ClassUtils;
|
||||
*/
|
||||
public class BatchMessageListenerContainerIntegrationTests extends AbstractDependencyInjectionSpringContextTests {
|
||||
|
||||
private final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
private JmsTemplate jmsTemplate;
|
||||
|
||||
private BatchMessageListenerContainer container;
|
||||
|
||||
@@ -52,7 +52,7 @@ public class BatchMessageListenerContainerTests extends TestCase {
|
||||
container = getContainer(template);
|
||||
boolean received = doExecute(null, null);
|
||||
assertEquals(1, count);
|
||||
assertFalse("Message received", received);
|
||||
assertTrue("Message received", received);
|
||||
}
|
||||
|
||||
private BatchMessageListenerContainer getContainer(RepeatTemplate template) {
|
||||
@@ -139,8 +139,8 @@ public class BatchMessageListenerContainerTests extends TestCase {
|
||||
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
|
||||
container = getContainer(template);
|
||||
container.setSessionTransacted(false);
|
||||
boolean received = doTestWithException(new IllegalStateException("No way!"), false, 2);
|
||||
assertTrue("Message not received", received);
|
||||
boolean received = doTestWithException(new IllegalStateException("No way!"), false, 1);
|
||||
assertFalse("Message received successfully", received);
|
||||
}
|
||||
|
||||
public void testNonTransactionalReceiveAndExecuteWithCallbackThrowingError() throws Exception {
|
||||
@@ -149,8 +149,8 @@ public class BatchMessageListenerContainerTests extends TestCase {
|
||||
container = getContainer(template);
|
||||
container.setSessionTransacted(false);
|
||||
try {
|
||||
boolean received = doTestWithException(new RuntimeException("No way!"), false, 2);
|
||||
assertTrue("Message not received", received);
|
||||
boolean received = doTestWithException(new RuntimeException("No way!"), false, 1);
|
||||
assertFalse("Message received successfully", received);
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
assertEquals("No way!", e.getMessage());
|
||||
|
||||
@@ -80,6 +80,12 @@ public class AsynchronousTests extends AbstractDependencyInjectionSpringContextT
|
||||
container.stop();
|
||||
// Need to give the container time to shutdown
|
||||
Thread.sleep(1000L);
|
||||
String foo = "";
|
||||
int count = 0;
|
||||
while (foo != null && count < 100) {
|
||||
foo = (String) jmsTemplate.receiveAndConvert("queue");
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
List list = new ArrayList();
|
||||
@@ -122,11 +128,15 @@ public class AsynchronousTests extends AbstractDependencyInjectionSpringContextT
|
||||
public void testRollback() throws Exception {
|
||||
|
||||
assertInitialState();
|
||||
|
||||
// Prevent us from being overwhelmed after rollback
|
||||
container.setRecoveryInterval(500);
|
||||
|
||||
container.setMessageListener(new SessionAwareMessageListener() {
|
||||
public void onMessage(Message message, Session session) throws JMSException {
|
||||
list.add(message.toString());
|
||||
final String text = ((TextMessage) message).getText();
|
||||
logger.debug("Processing message: " + message);
|
||||
jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] {
|
||||
new Integer(list.size()), text });
|
||||
// This causes the DB to rollback but not the message
|
||||
@@ -138,8 +148,10 @@ public class AsynchronousTests extends AbstractDependencyInjectionSpringContextT
|
||||
|
||||
container.start();
|
||||
|
||||
// Need to sleep for at least a second here...
|
||||
Thread.sleep(3000L);
|
||||
// Need to sleep here, but not too long or the
|
||||
// container goes into its own recovery cycle and spits out the bad
|
||||
// message...
|
||||
Thread.sleep(500L);
|
||||
|
||||
// We rolled back so the messages might come in many times...
|
||||
assertTrue(list.size() >= 1);
|
||||
|
||||
Reference in New Issue
Block a user