diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/container/jms/BatchMessageListenerContainer.java b/spring-batch-integration/src/main/java/org/springframework/batch/container/jms/BatchMessageListenerContainer.java index 918d0333c..98a270322 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/container/jms/BatchMessageListenerContainer.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/container/jms/BatchMessageListenerContainer.java @@ -17,14 +17,13 @@ package org.springframework.batch.container.jms; import javax.jms.JMSException; -import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; +import org.springframework.batch.repeat.ExitStatus; import org.springframework.batch.repeat.RepeatCallback; import org.springframework.batch.repeat.RepeatContext; import org.springframework.batch.repeat.RepeatOperations; -import org.springframework.batch.repeat.ExitStatus; import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy; import org.springframework.jms.listener.DefaultMessageListenerContainer; import org.springframework.jms.listener.adapter.MessageListenerAdapter; @@ -44,8 +43,6 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain private RepeatOperations template; - private ThreadLocal messageHolder = new ThreadLocal(); - /** * Create a new {@link BatchMessageListenerContainer}. The container is set * with auto startup = false (not the default of the parent container). @@ -63,88 +60,6 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain setMessageListener(new MessageListenerAdapter()); } - /** - * Override base class method to store message in a thread local for later - * use. - * - * @see org.springframework.jms.listener.AbstractPollingMessageListenerContainer#receiveMessage(javax.jms.MessageConsumer) - */ - protected Message receiveMessage(MessageConsumer consumer) throws JMSException { - Message message = super.receiveMessage(consumer); - if (message!=null) { - messageHolder.set(message); - } - return message; - } - - /** - * Override base class method to enable the message holder to be reset, - * signalling that a rollback has occurred. - * - * @see org.springframework.jms.listener.AbstractMessageListenerContainer#rollbackOnExceptionIfNecessary(javax.jms.Session, - * java.lang.Throwable) - */ - protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) throws JMSException { - super.rollbackOnExceptionIfNecessary(session, ex); - if (session.getTransacted() && isSessionTransacted()) { - messageHolder.set(null); - } - } - - /** - * Override base class to allow extra processing in the case of exception, - * with knowledge of the message. - * - * @see org.springframework.jms.listener.AbstractMessageListenerContainer#doExecuteListener(javax.jms.Session, - * javax.jms.Message) - */ - protected void doExecuteListener(Session session, Message message) throws JMSException { - try { - super.doExecuteListener(session, message); - } - catch (Throwable ex) { - handleListenerException(session, message, ex); - } - } - - /** - * Extension point for subclasses. Do anything necessary to recover from the - * exception, which was raised when the message was being processed. - * @param session the current JMS session. - * @param message the message just received and failed to process. - * @param ex the exception thrown during message processing. - */ - protected void recover(Session session, Message message, Throwable ex) throws JMSException { - // do nothing... - } - - /** - * Used to provide a recovery path - delegates to - * {@link #recover(Session, Message, Throwable)}. - * - * TODO: Could be merged into base class? - * - * @param session the JMS session - * @param message the last message - * @param ex the exception thrown by listener - * @see #doExecuteListener(Session, Message) - * @see #recover(Session, Message, Throwable) - */ - protected final void handleListenerException(Session session, Message message, Throwable ex) throws JMSException { - // Call out to recovery path... - recover(session, message, ex); - if (ex instanceof RuntimeException) { - // We need to rethrow so that an enclosing non-JMS transaction can - // rollback... - throw (RuntimeException) ex; - } - else if (ex instanceof Error) { - // Just re-throw Error instances because otherwise unit tests just - // swallow exceptions from EasyMock and JUnit. - throw (Error) ex; - } - } - /** * Override base class to prevent exceptions from being swallowed. * @@ -176,18 +91,13 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain */ protected boolean receiveAndExecute(final Session session, final MessageConsumer consumer) throws JMSException { - template.iterate(new RepeatCallback() { + ExitStatus status = template.iterate(new RepeatCallback() { public ExitStatus doInIteration(RepeatContext context) throws Exception { return doBatchCallBack(session, consumer); } }); - if (messageHolder.get()==null) { - return false; - } - - messageHolder.set(null); - return true; + return status.isContinuable(); } /** @@ -219,8 +129,7 @@ public class BatchMessageListenerContainer extends DefaultMessageListenerContain * be rolled back when a batch fails. */ if (super.receiveAndExecute(session, consumer)) { - Object message = messageHolder.get(); - return new ExitStatus(message!=null); + return ExitStatus.CONTINUABLE; } return ExitStatus.FINISHED; } diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/container/jms/BatchMessageListenerContainerIntegrationTests.java b/spring-batch-integration/src/test/java/org/springframework/batch/container/jms/BatchMessageListenerContainerIntegrationTests.java index 2908c5fdb..1b3f4c883 100644 --- a/spring-batch-integration/src/test/java/org/springframework/batch/container/jms/BatchMessageListenerContainerIntegrationTests.java +++ b/spring-batch-integration/src/test/java/org/springframework/batch/container/jms/BatchMessageListenerContainerIntegrationTests.java @@ -20,8 +20,6 @@ import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.springframework.batch.item.AbstractItemWriter; import org.springframework.batch.item.ItemKeyGenerator; import org.springframework.batch.item.ItemRecoverer; @@ -40,8 +38,6 @@ import org.springframework.util.ClassUtils; */ public class BatchMessageListenerContainerIntegrationTests extends AbstractDependencyInjectionSpringContextTests { - private final Log logger = LogFactory.getLog(getClass()); - private JmsTemplate jmsTemplate; private BatchMessageListenerContainer container; diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/container/jms/BatchMessageListenerContainerTests.java b/spring-batch-integration/src/test/java/org/springframework/batch/container/jms/BatchMessageListenerContainerTests.java index 83bf092d4..488999f41 100644 --- a/spring-batch-integration/src/test/java/org/springframework/batch/container/jms/BatchMessageListenerContainerTests.java +++ b/spring-batch-integration/src/test/java/org/springframework/batch/container/jms/BatchMessageListenerContainerTests.java @@ -52,7 +52,7 @@ public class BatchMessageListenerContainerTests extends TestCase { container = getContainer(template); boolean received = doExecute(null, null); assertEquals(1, count); - assertFalse("Message received", received); + assertTrue("Message received", received); } private BatchMessageListenerContainer getContainer(RepeatTemplate template) { @@ -139,8 +139,8 @@ public class BatchMessageListenerContainerTests extends TestCase { template.setCompletionPolicy(new SimpleCompletionPolicy(2)); container = getContainer(template); container.setSessionTransacted(false); - boolean received = doTestWithException(new IllegalStateException("No way!"), false, 2); - assertTrue("Message not received", received); + boolean received = doTestWithException(new IllegalStateException("No way!"), false, 1); + assertFalse("Message received successfully", received); } public void testNonTransactionalReceiveAndExecuteWithCallbackThrowingError() throws Exception { @@ -149,8 +149,8 @@ public class BatchMessageListenerContainerTests extends TestCase { container = getContainer(template); container.setSessionTransacted(false); try { - boolean received = doTestWithException(new RuntimeException("No way!"), false, 2); - assertTrue("Message not received", received); + boolean received = doTestWithException(new RuntimeException("No way!"), false, 1); + assertFalse("Message received successfully", received); } catch (RuntimeException e) { assertEquals("No way!", e.getMessage()); diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/repeat/jms/AsynchronousTests.java b/spring-batch-integration/src/test/java/org/springframework/batch/repeat/jms/AsynchronousTests.java index 3f50db69b..3acdaeeb6 100644 --- a/spring-batch-integration/src/test/java/org/springframework/batch/repeat/jms/AsynchronousTests.java +++ b/spring-batch-integration/src/test/java/org/springframework/batch/repeat/jms/AsynchronousTests.java @@ -80,6 +80,12 @@ public class AsynchronousTests extends AbstractDependencyInjectionSpringContextT container.stop(); // Need to give the container time to shutdown Thread.sleep(1000L); + String foo = ""; + int count = 0; + while (foo != null && count < 100) { + foo = (String) jmsTemplate.receiveAndConvert("queue"); + count++; + } } List list = new ArrayList(); @@ -122,11 +128,15 @@ public class AsynchronousTests extends AbstractDependencyInjectionSpringContextT public void testRollback() throws Exception { assertInitialState(); + + // Prevent us from being overwhelmed after rollback + container.setRecoveryInterval(500); container.setMessageListener(new SessionAwareMessageListener() { public void onMessage(Message message, Session session) throws JMSException { list.add(message.toString()); final String text = ((TextMessage) message).getText(); + logger.debug("Processing message: " + message); jdbcTemplate.update("INSERT into T_FOOS (id,name,foo_date) values (?,?,null)", new Object[] { new Integer(list.size()), text }); // This causes the DB to rollback but not the message @@ -138,8 +148,10 @@ public class AsynchronousTests extends AbstractDependencyInjectionSpringContextT container.start(); - // Need to sleep for at least a second here... - Thread.sleep(3000L); + // Need to sleep here, but not too long or the + // container goes into its own recovery cycle and spits out the bad + // message... + Thread.sleep(500L); // We rolled back so the messages might come in many times... assertTrue(list.size() >= 1);