AMQP-171: move some of commit/rollback responsibility to the consumer

This commit is contained in:
Dave Syer
2011-06-01 14:07:13 +01:00
parent ee3338c9f7
commit 0042bdc6f7
7 changed files with 131 additions and 144 deletions

View File

@@ -18,7 +18,7 @@ import org.springframework.beans.factory.FactoryBean;
import org.springframework.retry.RetryOperations;
/**
* Convenient base class for intercptor factories.
* Convenient base class for interceptor factories.
*
* @author Dave Syer
*

View File

@@ -166,12 +166,12 @@ public class RabbitResourceHolder extends ResourceHolderSupport {
for (Long deliveryTag : deliveryTags.get(channel)) {
try {
channel.basicReject(deliveryTag, true);
// Need to commit the reject (=nack)
RabbitUtils.commitIfNecessary(channel);
} catch (IOException ex) {
throw new AmqpIOException(ex);
}
}
// Need to commit the reject (=nack)
RabbitUtils.commitIfNecessary(channel);
}
}
}

View File

@@ -13,9 +13,6 @@
package org.springframework.amqp.rabbit.listener;
import java.io.IOException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
@@ -24,7 +21,6 @@ import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
@@ -443,39 +439,21 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
* @param channel the Rabbit Channel to operate on
* @param message the received Rabbit Message
* @see #invokeListener
* @see #commitIfNecessary
* @see #rollbackOnExceptionIfNecessary
* @see #handleListenerException
*/
protected void executeListener(Channel channel, Message message) throws Throwable {
try {
doExecuteListener(channel, message);
} catch (Throwable ex) {
handleListenerException(ex);
throw ex;
}
}
/**
* Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
* @param channel the Rabbit Channel to operate on
* @param message the received Rabbit Message
* @throws Throwable
* @throws Exception if thrown by Rabbit API methods
* @see #invokeListener
* @see #commitIfNecessary
* @see #rollbackOnExceptionIfNecessary
* @see #convertRabbitAccessException
*/
protected void doExecuteListener(Channel channel, Message message) throws Throwable {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received message because the listener container has been stopped: " + message);
}
rollbackIfNecessary(channel);
throw new MessageRejectedWhileStoppingException();
}
invokeListener(channel, message);
try {
invokeListener(channel, message);
} catch (Throwable ex) {
handleListenerException(ex);
throw ex;
}
}
/**
@@ -552,92 +530,6 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
}
}
/**
* Perform a commit or message acknowledgement, as appropriate.
* @param channel the Rabbit channel to commit
* @param message the Message to acknowledge
* @throws IOException
*/
protected void commitIfNecessary(Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
boolean ackRequired = !getAcknowledgeMode().isAutoAck() && !getAcknowledgeMode().isManual();
if (isChannelLocallyTransacted(channel)) {
if (ackRequired) {
channel.basicAck(deliveryTag, true);
}
// For manual acks we still need to commit
RabbitUtils.commitIfNecessary(channel);
} else if (isChannelTransacted() && ackRequired) {
// Not locally transacted but it is transacted so it
// could be synchronized with an external transaction
ConnectionFactoryUtils.registerDeliveryTag(getConnectionFactory(), channel, deliveryTag);
} else if (ackRequired) {
if (ackRequired) {
channel.basicAck(deliveryTag, true);
}
}
}
/**
* Perform a rollback, if appropriate.
* @param channel the Rabbit Channel to roll back
*/
protected void rollbackIfNecessary(Channel channel) {
boolean ackRequired = !getAcknowledgeMode().isAutoAck() && !getAcknowledgeMode().isManual();
if (ackRequired) {
/*
* Re-queue messages and don't get them re-delivered to the same consumer, otherwise the broker just spins
* trying to get us to accept the same message over and over
*/
try {
channel.basicRecover(true);
} catch (IOException e) {
throw new AmqpIOException(e);
}
}
if (this.isChannelLocallyTransacted(channel)) {
// Transacted channel enabled by this container -> rollback.
RabbitUtils.rollbackIfNecessary(channel);
}
}
/**
* Perform a rollback, handling rollback exceptions properly.
* @param channel the Rabbit Channel to roll back
* @param ex the thrown application exception or error
* @throws Exception in case of a rollback error
*/
protected void rollbackOnExceptionIfNecessary(Channel channel, Message message, Throwable ex) throws Exception {
boolean ackRequired = !getAcknowledgeMode().isAutoAck() && !getAcknowledgeMode().isManual();
try {
if (this.isChannelTransacted()) {
if (logger.isDebugEnabled()) {
logger.debug("Initiating transaction rollback on application exception: " + ex);
}
RabbitUtils.rollbackIfNecessary(channel);
}
if (message != null) {
if (ackRequired) {
if (logger.isDebugEnabled()) {
logger.debug("Rejecting message");
}
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
}
if (this.isChannelTransacted()) {
// Need to commit the reject (=nack)
RabbitUtils.commitIfNecessary(channel);
}
}
} catch (Exception e) {
logger.error("Application exception overridden by rollback exception", ex);
throw e;
}
}
/**
* Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
* listener container's Channel handling and not by an external transaction coordinator.
@@ -660,10 +552,6 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
* @param ex the exception to handle
*/
protected void handleListenerException(Throwable ex) {
if (ex instanceof MessageRejectedWhileStoppingException) {
// Internal exception - has been handled before.
return;
}
if (isActive()) {
// Regular case: failed while active.
// Invoke ErrorHandler if available.
@@ -675,15 +563,6 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
}
}
/**
* Internal exception class that indicates a rejected message on shutdown. Used to trigger a rollback for an
* external transaction manager in that case.
*/
@SuppressWarnings("serial")
private static class MessageRejectedWhileStoppingException extends RuntimeException {
}
/**
* Exception that indicates that the initial setup of this container's shared Rabbit Connection failed. This is
* indicating to invokers that they need to establish the shared Connection themselves on first access.

View File

@@ -1,6 +1,8 @@
package org.springframework.amqp.rabbit.listener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -59,12 +61,15 @@ public class BlockingQueueConsumer {
private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter;
private List<Long> deliveryTags = new ArrayList<Long>();
/**
* Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker
* until it is started.
*/
public BlockingQueueConsumer(ConnectionFactory connectionFactory, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String... queues) {
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, String... queues) {
this.connectionFactory = connectionFactory;
this.activeObjectCounter = activeObjectCounter;
this.acknowledgeMode = acknowledgeMode;
@@ -113,6 +118,7 @@ public class BlockingQueueConsumer {
if (logger.isDebugEnabled()) {
logger.debug("Received message: " + message);
}
deliveryTags.add(messageProperties.getDeliveryTag());
return message;
}
@@ -262,4 +268,89 @@ public class BlockingQueueConsumer {
+ ", acknowledgeMode=" + acknowledgeMode + " local queue size=" + queue.size();
}
/**
* Perform a rollback, handling rollback exceptions properly.
* @param ex the thrown application exception or error
* @throws Exception in case of a rollback error
*/
public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {
boolean ackRequired = !acknowledgeMode.isAutoAck() && !acknowledgeMode.isManual();
try {
if (transactional) {
if (logger.isDebugEnabled()) {
logger.debug("Initiating transaction rollback on application exception: " + ex);
}
RabbitUtils.rollbackIfNecessary(channel);
}
if (ackRequired) {
if (logger.isDebugEnabled()) {
logger.debug("Rejecting messages");
}
for (Long deliveryTag : deliveryTags) {
// With newer RabbitMQ brokers could use basicNack here...
channel.basicReject(deliveryTag, true);
}
if (transactional) {
// Need to commit the reject (=nack)
RabbitUtils.commitIfNecessary(channel);
}
}
} catch (Exception e) {
logger.error("Application exception overridden by rollback exception", ex);
throw e;
} finally {
deliveryTags.clear();
}
}
/**
* Perform a commit or message acknowledgement, as appropriate.
* @param locallyTransacted
* @throws IOException
*/
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
if (deliveryTags.isEmpty()) {
return false;
}
try {
boolean ackRequired = !acknowledgeMode.isAutoAck() && !acknowledgeMode.isManual();
if (ackRequired) {
if (transactional && !locallyTransacted) {
// Not locally transacted but it is transacted so it
// could be synchronized with an external transaction
for (Long deliveryTag : deliveryTags) {
ConnectionFactoryUtils.registerDeliveryTag(connectionFactory, channel, deliveryTag);
}
} else {
if (!deliveryTags.isEmpty()) {
long deliveryTag = deliveryTags.get(deliveryTags.size()-1);
channel.basicAck(deliveryTag, true);
}
}
}
if (locallyTransacted) {
// For manual acks we still need to commit
RabbitUtils.commitIfNecessary(channel);
}
} finally {
deliveryTags.clear();
}
return true;
}
}

View File

@@ -0,0 +1,16 @@
package org.springframework.amqp.rabbit.listener;
import org.springframework.amqp.AmqpException;
/**
* Exception class that indicates a rejected message on shutdown. Used to trigger a rollback for an
* external transaction manager in that case.
*/
@SuppressWarnings("serial")
public class MessageRejectedWhileStoppingException extends AmqpException {
public MessageRejectedWhileStoppingException() {
super("Message listener container was stopping when a message was received");
}
}

View File

@@ -126,9 +126,16 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
this.recoveryInterval = recoveryInterval;
}
public SimpleMessageListenerContainer() {
}
/**
* Default constructor for convenient dependency injection via setters.
*/
public SimpleMessageListenerContainer() { }
/**
* Create a listener container from the connection factory (mandatory).
*
* @param connectionFactory the {@link ConnectionFactory}
*/
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
this.setConnectionFactory(connectionFactory);
}
@@ -405,7 +412,6 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
Channel channel = consumer.getChannel();
Message lastMessage = null;
for (int i = 0; i < txSize; i++) {
logger.trace("Waiting for message from consumer.");
@@ -413,23 +419,18 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
if (message == null) {
break;
}
lastMessage = message;
try {
executeListener(channel, message);
} catch (ImmediateAcknowledgeAmqpException e) {
break;
} catch (Throwable ex) {
rollbackOnExceptionIfNecessary(channel, message, ex);
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
if (lastMessage != null) {
commitIfNecessary(channel, lastMessage);
return true;
}
return false;
return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));
}

View File

@@ -203,7 +203,7 @@ public class MessageListenerContainerRetryIntegrationTests {
try {
int timeout = Math.min(1 + messageCount / concurrentConsumers, 30);
int timeout = Math.min(1 + 2 * messageCount / concurrentConsumers, 30);
final int count = messageCount;
logger.debug("Waiting for messages with timeout = " + timeout + " (s)");