AMQP-168: fix txSize efficiency (committing too frequently) and retry (not committing on recovery)

This commit is contained in:
Dave Syer
2011-05-11 09:41:13 +01:00
parent 15afb1d0c0
commit 1ad6b2aa5b
10 changed files with 264 additions and 40 deletions

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp;
/**
* Special exception for listener implementations that want to signal that the current batch of messages should be
* acknowledged immediately (i.e. as soon as possible) without rollback, and without consuming any more messages.
*
* @author Dave Syer
*
*/
@SuppressWarnings("serial")
public class ImmediateAcknowledgeAmqpException extends AmqpException {
public ImmediateAcknowledgeAmqpException(String message) {
super(message);
}
public ImmediateAcknowledgeAmqpException(Throwable cause) {
super(cause);
}
public ImmediateAcknowledgeAmqpException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -14,6 +14,7 @@ package org.springframework.amqp.rabbit.config;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.FatalListenerExecutionException;
import org.springframework.amqp.rabbit.retry.MessageKeyGenerator;
@@ -88,7 +89,10 @@ public class StatefulRetryOperationsInterceptorFactoryBean extends AbstractRetry
} else {
messageRecoverer.recover(message, cause);
}
return null;
// This is actually a normal outcome. It means the recovery was successful, but we don't want to consume
// any more messages until the acks and commits are sent for this (problematic) message...
throw new ImmediateAcknowledgeAmqpException("Recovered message forces ack (if ack mode requires it): "
+ message, cause);
}
});

View File

@@ -475,13 +475,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
rollbackIfNecessary(channel);
throw new MessageRejectedWhileStoppingException();
}
try {
invokeListener(channel, message);
} catch (Throwable ex) {
rollbackOnExceptionIfNecessary(channel, message, ex);
throw ex;
}
commitIfNecessary(channel, message);
invokeListener(channel, message);
}
/**
@@ -570,8 +564,9 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
boolean ackRequired = !getAcknowledgeMode().isAutoAck() && !getAcknowledgeMode().isManual();
if (isChannelLocallyTransacted(channel)) {
if (ackRequired) {
channel.basicAck(deliveryTag, false);
channel.basicAck(deliveryTag, true);
}
// For manual acks we still need to commit
RabbitUtils.commitIfNecessary(channel);
} else if (isChannelTransacted() && ackRequired) {
// Not locally transacted but it is transacted so it
@@ -579,7 +574,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
ConnectionFactoryUtils.registerDeliveryTag(getConnectionFactory(), channel, deliveryTag);
} else if (ackRequired) {
if (ackRequired) {
channel.basicAck(deliveryTag, false);
channel.basicAck(deliveryTag, true);
}
}

View File

@@ -23,6 +23,7 @@ import java.util.concurrent.TimeoutException;
import org.aopalliance.aop.Advice;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIllegalStateException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -341,8 +342,11 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
protected BlockingQueueConsumer createBlockingQueueConsumer() {
BlockingQueueConsumer consumer;
String[] queues = getRequiredQueueNames();
// There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker
// didn't get an ack for delivered messages
int actualPrefetchCount = prefetchCount > txSize ? prefetchCount : txSize;
consumer = new BlockingQueueConsumer(getConnectionFactory(), cancellationLock, getAcknowledgeMode(),
isChannelTransacted(), prefetchCount, queues);
isChannelTransacted(), actualPrefetchCount, queues);
return consumer;
}
@@ -401,18 +405,31 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
Channel channel = consumer.getChannel();
Message lastMessage = null;
for (int i = 0; i < txSize; i++) {
logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(receiveTimeout);
if (message == null) {
return false;
break;
}
lastMessage = message;
try {
executeListener(channel, message);
} catch (ImmediateAcknowledgeAmqpException e) {
break;
} catch (Throwable ex) {
rollbackOnExceptionIfNecessary(channel, message, ex);
throw ex;
}
executeListener(channel, message);
}
if (lastMessage != null) {
commitIfNecessary(channel, lastMessage);
return true;
}
return true;
return false;
}

View File

@@ -1,7 +1,6 @@
package org.springframework.amqp.rabbit.listener;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
@@ -29,10 +28,13 @@ import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.test.BrokerRunning;
import org.springframework.amqp.rabbit.test.BrokerTestUtils;
import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster;
import org.springframework.amqp.rabbit.test.RepeatProcessor;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.retry.policy.MapRetryContextCache;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.Repeat;
public class MessageListenerContainerRetryIntegrationTests {
@@ -47,9 +49,15 @@ public class MessageListenerContainerRetryIntegrationTests {
public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class,
SimpleMessageListenerContainer.class, BlockingQueueConsumer.class);
@Rule
public Log4jLevelAdjuster traceLevels = new Log4jLevelAdjuster(Level.TRACE, StatefulRetryOperationsInterceptorFactoryBean.class, MessageListenerContainerRetryIntegrationTests.class);
@Rule
public ExpectedException exception = ExpectedException.none();
@Rule
public RepeatProcessor repeats = new RepeatProcessor();
private RabbitTemplate template;
private RetryTemplate retryTemplate;
@@ -114,6 +122,7 @@ public class MessageListenerContainerRetryIntegrationTests {
}
@Test
@Repeat(10)
public void testStatefulRetryWithTxSizeAndIntermittentFailure() throws Exception {
int messageCount = 10;
@@ -144,7 +153,7 @@ public class MessageListenerContainerRetryIntegrationTests {
}
factory.setMessageRecoverer(new MessageRecoverer() {
public void recover(Message message, Throwable cause) {
logger.info("Recovered: " + message);
logger.info("Recovered: [" + SerializationUtils.deserialize(message.getBody()).toString()+"], message: " +message);
latch.countDown();
}
});
@@ -220,16 +229,16 @@ public class MessageListenerContainerRetryIntegrationTests {
assertTrue("Timed out waiting for messages", waited);
// Retried each failure 3 times (default retry policy)...
assertEquals(messageCount + 2 * failedMessageCount, listener.getCount());
assertEquals(3 * failedMessageCount, listener.getCount());
// All failed messages recovered
assertEquals(null, template.receiveAndConvert(queue.getName()));
} finally {
container.shutdown();
assertEquals(0, container.getActiveConsumerCount());
}
// All failed messages recovered
assertNull(template.receiveAndConvert(queue.getName()));
}
public static class PojoListener {
@@ -241,8 +250,10 @@ public class MessageListenerContainerRetryIntegrationTests {
}
public void handleMessage(int value) throws Exception {
logger.debug(value+ ":" + count.getAndIncrement());
logger.debug("Handling: ["+value+ "], fails:" + count);
if (value % failFrequency == 0) {
count.getAndIncrement();
logger.debug("Failing: ["+value+ "], fails:" + count);
throw new RuntimeException("Planned");
}
}

View File

@@ -46,8 +46,6 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
private int messageCount = 10;
private int txSize = 1;
private boolean transactional = false;
private AcknowledgeMode acknowledgeMode = AcknowledgeMode.AUTO;
@@ -79,7 +77,7 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
}
@Test
public void testListenerSendsMessageAndThenCommit() throws Exception {
public void testListenerSendsMessageAndThenContainerCommits() throws Exception {
ConnectionFactory connectionFactory = createConnectionFactory();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
@@ -102,7 +100,7 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
byte[] bytes = (byte[]) template.receiveAndConvert(sendQueue.getName());
assertNotNull(bytes);
assertEquals("bar", new String(bytes));
assertNull(template.receiveAndConvert(queue.getName()));
assertEquals(null, template.receiveAndConvert(queue.getName()));
}
@@ -278,8 +276,6 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(new MessageListenerAdapter(listener));
container.setQueueNames(queueName);
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);
container.setChannelTransacted(transactional);
container.setAcknowledgeMode(acknowledgeMode);

View File

@@ -0,0 +1,152 @@
package org.springframework.amqp.rabbit.listener;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.test.BrokerRunning;
import org.springframework.amqp.rabbit.test.BrokerTestUtils;
import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster;
import com.rabbitmq.client.Channel;
public class MessageListenerTxSizeIntegrationTests {
private static Log logger = LogFactory.getLog(MessageListenerTxSizeIntegrationTests.class);
private Queue queue = new Queue("test.queue");
private RabbitTemplate template = new RabbitTemplate();
private int concurrentConsumers = 1;
private int messageCount = 12;
private int txSize = 4;
private boolean transactional = true;
private SimpleMessageListenerContainer container;
@Rule
public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.DEBUG, RabbitTemplate.class,
SimpleMessageListenerContainer.class, BlockingQueueConsumer.class);
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue);
@Before
public void createConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setChannelCacheSize(concurrentConsumers);
connectionFactory.setPort(BrokerTestUtils.getPort());
template.setConnectionFactory(connectionFactory);
}
@After
public void clear() throws Exception {
// Wait for broker communication to finish before trying to stop container
Thread.sleep(300L);
logger.debug("Shutting down at end of test");
if (container != null) {
container.shutdown();
}
}
@Test
public void testListenerTransactionalSunnyDay() throws Exception {
transactional = true;
CountDownLatch latch = new CountDownLatch(messageCount);
container = createContainer(new TestListener(latch, false));
for (int i = 0; i < messageCount; i++) {
template.convertAndSend(queue.getName(), i + "foo");
}
int timeout = Math.min(1 + messageCount / (4 * concurrentConsumers), 30);
logger.debug("Waiting for messages with timeout = " + timeout + " (s)");
boolean waited = latch.await(timeout, TimeUnit.SECONDS);
assertTrue("Timed out waiting for message", waited);
assertNull(template.receiveAndConvert(queue.getName()));
}
@Test
public void testListenerTransactionalFails() throws Exception {
transactional = true;
CountDownLatch latch = new CountDownLatch(messageCount);
container = createContainer(new TestListener(latch, true));
for (int i = 0; i < txSize; i++) {
template.convertAndSend(queue.getName(), i + "foo");
}
int timeout = Math.min(1 + messageCount / (4 * concurrentConsumers), 30);
logger.debug("Waiting for messages with timeout = " + timeout + " (s)");
boolean waited = latch.await(timeout, TimeUnit.SECONDS);
assertTrue("Timed out waiting for message", waited);
assertNull(template.receiveAndConvert(queue.getName()));
}
private SimpleMessageListenerContainer createContainer(Object listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory());
container.setMessageListener(new MessageListenerAdapter(listener));
container.setQueueNames(queue.getName());
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);
container.setChannelTransacted(transactional);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.afterPropertiesSet();
container.start();
return container;
}
public class TestListener implements ChannelAwareMessageListener {
private ThreadLocal<Integer> count = new ThreadLocal<Integer>();
private final CountDownLatch latch;
private final boolean fail;
public TestListener(CountDownLatch latch, boolean fail) {
this.latch = latch;
this.fail = fail;
}
public void handleMessage(String value) {
}
public void onMessage(Message message, Channel channel) throws Exception {
String value = new String(message.getBody());
try {
logger.debug("Received: " + value);
if (count.get()==null) {
count.set(1);
} else {
count.set(count.get()+1);
}
if (count.get()==txSize && fail) {
logger.debug("Failing: " + value);
count.set(0);
throw new RuntimeException("Planned");
}
} finally {
latch.countDown();
}
}
}
}

View File

@@ -53,7 +53,7 @@ public class SimpleMessageListenerContainerIntegrationTests {
@Rule
public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.OFF, RabbitTemplate.class,
SimpleMessageListenerContainer.class, BlockingQueueConsumer.class);
SimpleMessageListenerContainer.class, BlockingQueueConsumer.class, CachingConnectionFactory.class);
@Rule
public Log4jLevelAdjuster testLogLevels = new Log4jLevelAdjuster(Level.DEBUG,
@@ -94,9 +94,9 @@ public class SimpleMessageListenerContainerIntegrationTests {
params(6, 2, 2, AcknowledgeMode.NONE), //
params(7, 20, 4, AcknowledgeMode.AUTO), //
params(8, 20, 4, AcknowledgeMode.NONE), //
params(9, 1000, 4, AcknowledgeMode.AUTO), //
params(10, 1000, 4, AcknowledgeMode.NONE), //
params(11, 1000, 4, AcknowledgeMode.AUTO, 10) //
params(9, 300, 4, AcknowledgeMode.AUTO), //
params(10, 300, 4, AcknowledgeMode.NONE), //
params(11, 300, 4, AcknowledgeMode.AUTO, 10) //
);
}

View File

@@ -1,5 +1,8 @@
package org.springframework.amqp.rabbit.test;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assume;
@@ -46,18 +49,20 @@ public class BrokerRunning extends TestWatchman {
private static Log logger = LogFactory.getLog(BrokerRunning.class);
// Static so that we only test once on failure: speeds up test suite
private static boolean brokerOnline = true;
private static Map<Integer,Boolean> brokerOnline = new HashMap<Integer, Boolean>();
// Static so that we only test once on failure
private static boolean brokerOffline = true;
private static Map<Integer,Boolean> brokerOffline = new HashMap<Integer, Boolean>();
private final boolean assumeOnline;
private final boolean purge;
private Queue[] queues;
private int DEFAULT_PORT = BrokerTestUtils.getPort();
private int port = BrokerTestUtils.getPort();
private int port;
private String hostName = null;
@@ -101,6 +106,7 @@ public class BrokerRunning extends TestWatchman {
this.assumeOnline = assumeOnline;
this.queues = queues;
this.purge = purge;
setPort(DEFAULT_PORT);
}
private BrokerRunning(boolean assumeOnline, Queue... queues) {
@@ -116,6 +122,12 @@ public class BrokerRunning extends TestWatchman {
*/
public void setPort(int port) {
this.port = port;
if (!brokerOffline.containsKey(port)) {
brokerOffline.put(port, true);
}
if (!brokerOnline.containsKey(port)) {
brokerOnline.put(port, true);
}
}
/**
@@ -130,9 +142,9 @@ public class BrokerRunning extends TestWatchman {
// Check at the beginning, so this can be used as a static field
if (assumeOnline) {
Assume.assumeTrue(brokerOnline);
Assume.assumeTrue(brokerOnline.get(port));
} else {
Assume.assumeTrue(brokerOffline);
Assume.assumeTrue(brokerOffline.get(port));
}
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
@@ -161,14 +173,14 @@ public class BrokerRunning extends TestWatchman {
admin.declareQueue(queue);
}
}
brokerOffline = false;
brokerOffline.put(port, false);
if (!assumeOnline) {
Assume.assumeTrue(brokerOffline);
Assume.assumeTrue(brokerOffline.get(port));
}
} catch (Exception e) {
logger.warn("Not executing tests because basic connectivity test failed", e);
brokerOnline = false;
brokerOnline.put(port, false);
if (assumeOnline) {
Assume.assumeNoException(e);
}

View File

@@ -89,7 +89,7 @@ public class RepeatProcessor implements MethodRule {
try {
base.evaluate();
} catch (Throwable t) {
throw new IllegalStateException("Failed on iteration: " + i, t);
throw new IllegalStateException("Failed on iteration: " + i + " of " + repeats + " (started at 0)", t);
}
}
} finally {