AMQP-132: Add retry configuration conveniences and abstractions

This commit is contained in:
Dave Syer
2011-03-25 18:29:06 +00:00
parent 5013d8fec8
commit 735bdf4ff1
14 changed files with 545 additions and 71 deletions

View File

@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
@@ -48,11 +49,22 @@ public class SimpleMessageConverter implements MessageConverter, BeanClassLoader
private String codebaseUrl;
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
private boolean createMessageIds = false;
public void setBeanClassLoader(ClassLoader beanClassLoader) {
this.beanClassLoader = beanClassLoader;
}
/**
* Flag to indicate that new messages should have unique identifiers added to their properties before sending.
* Default false.
* @param createMessageIds the flag value to set
*/
public void setCreateMessageIds(boolean createMessageIds) {
this.createMessageIds = createMessageIds;
}
/**
* Set the codebase URL to download classes from if not found locally. Can consists of multiple URLs, separated by
* spaces.
@@ -146,6 +158,9 @@ public class SimpleMessageConverter implements MessageConverter, BeanClassLoader
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
if (this.createMessageIds && messageProperties.getMessageId()==null) {
messageProperties.setMessageId(UUID.randomUUID().toString());
}
return new Message(bytes, messageProperties);
}

View File

@@ -41,9 +41,9 @@
<artifactId>spring-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.0.0.RC1</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
<optional>true</optional>
</dependency>

View File

@@ -0,0 +1,52 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.config;
import org.aopalliance.aop.Advice;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.retry.RetryOperations;
/**
* Convenient base class for intercptor factories.
*
* @author Dave Syer
*
*/
public abstract class AbstractRetryOperationsInterceptorFactoryBean implements FactoryBean<Advice> {
private MessageRecoverer messageRecoverer;
private RetryOperations retryTemplate;
public void setRetryOperations(RetryOperations retryTemplate) {
this.retryTemplate = retryTemplate;
}
public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
this.messageRecoverer = messageRecoverer;
}
protected RetryOperations getRetryOperations() {
return retryTemplate;
}
protected MessageRecoverer getMessageRecoverer() {
return messageRecoverer;
}
public boolean isSingleton() {
return true;
}
}

View File

@@ -0,0 +1,123 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.config;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.FatalListenerExecutionException;
import org.springframework.amqp.rabbit.retry.MessageKeyGenerator;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.NewMessageIdentifier;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.interceptor.MethodArgumentsKeyGenerator;
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
import org.springframework.retry.interceptor.NewMethodArgumentsIdentifier;
import org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor;
import org.springframework.retry.support.RetryTemplate;
/**
* Convenient factory bean for creating a stateful retry interceptor for use in a message listener container, giving you
* a large amount of control over the behaviour of a container when a listener fails. To control the number of retry
* attempt or the backoff in between attempts, supply a customized {@link RetryTemplate}. Stateful retry is appropriate
* if your listener is using a transactional resource that needs to be rollback on an exception (e.g. a stateful
* connection to a back end server). JPA is the canonical example. The semantics of stateful retry mean that a listener
* exception is propagated to the container, so that it can force a rollback. When the message is redelivered it has to
* be recognised (hence the {@link MessageKeyGenerator} strategy), and when the retry attempts are exhausted it will be
* processed using a {@link MessageRecoverer} if one is provided, in a new transaction. If a recoverer is not provided
* the message will be logged and dropped.
*
* @see RetryOperations#execute(org.springframework.retry.RetryCallback, org.springframework.retry.RecoveryCallback,
* org.springframework.retry.RetryState)
*
* @author Dave Syer
*
*/
public class StatefulRetryOperationsInterceptorFactoryBean extends AbstractRetryOperationsInterceptorFactoryBean {
private static Log logger = LogFactory.getLog(StatefulRetryOperationsInterceptorFactoryBean.class);
private MessageKeyGenerator messageKeyGenerator;
private NewMessageIdentifier newMessageIdentifier;
public void setMessageKeyGeneretor(MessageKeyGenerator messageKeyGeneretor) {
this.messageKeyGenerator = messageKeyGeneretor;
}
public void setNewMessageIdentifier(NewMessageIdentifier newMessageIdentifier) {
this.newMessageIdentifier = newMessageIdentifier;
}
public StatefulRetryOperationsInterceptor getObject() {
StatefulRetryOperationsInterceptor retryInterceptor = new StatefulRetryOperationsInterceptor();
RetryOperations retryTemplate = getRetryOperations();
if (retryTemplate == null) {
retryTemplate = new RetryTemplate();
}
retryInterceptor.setRetryOperations(retryTemplate);
retryInterceptor.setNewItemIdentifier(new NewMethodArgumentsIdentifier() {
public boolean isNew(Object[] args) {
Message message = (Message) args[1];
if (newMessageIdentifier == null) {
return !message.getMessageProperties().isRedelivered();
} else {
return newMessageIdentifier.isNew(message);
}
}
});
final MessageRecoverer messageRecoverer = getMessageRecoverer();
retryInterceptor.setRecoverer(new MethodInvocationRecoverer<Void>() {
public Void recover(Object[] args, Throwable cause) {
Message message = (Message) args[1];
if (messageRecoverer == null) {
logger.warn("Message dropped on recovery: " + message);
} else {
messageRecoverer.recover(message);
}
return null;
}
});
retryInterceptor.setKeyGenerator(new MethodArgumentsKeyGenerator() {
public Object getKey(Object[] args) {
Message message = (Message) args[1];
if (messageKeyGenerator == null) {
String messageId = message.getMessageProperties().getMessageId();
if (messageId == null) {
throw new FatalListenerExecutionException(
"Illegal null id in message. Failed to manage retry for message: " + message);
}
return messageId;
} else {
return messageKeyGenerator.getKey(message);
}
}
});
return retryInterceptor;
}
public Class<?> getObjectType() {
return StatefulRetryOperationsInterceptor.class;
}
public boolean isSingleton() {
return true;
}
}

View File

@@ -0,0 +1,71 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.retry.support.RetryTemplate;
/**
* Convenient factory bean for creating a stateless retry interceptor for use in a message listener container, giving
* you a large amount of control over the behaviour of a container when a listener fails. To control the number of retry
* attempt or the backoff in between attempts, supply a customized {@link RetryTemplate}. Stateless retry is appropriate
* if your listener can be called repeatedly between failures with no side effects. The semantics of stateless retry
* mean that a listener exception is not propagated to the container until the retry attempts are exhausted. When the
* retry attempts are exhausted it can be processed using a {@link MessageRecoverer} if one is provided, in the same
* transaction (in which case no exception is propagated). If a recoverer is not provided the exception will be
* propagated and the message may be redelivered if the channel is transactional.
*
* @see RetryOperations#execute(org.springframework.retry.RetryCallback, org.springframework.retry.RecoveryCallback)
*
* @author Dave Syer
*
*/
public class StatelessRetryOperationsInterceptorFactoryBean extends AbstractRetryOperationsInterceptorFactoryBean {
public RetryOperationsInterceptor getObject() {
RetryOperationsInterceptor retryInterceptor = new RetryOperationsInterceptor();
RetryOperations retryTemplate = getRetryOperations();
if (retryTemplate == null) {
retryTemplate = new RetryTemplate();
}
retryInterceptor.setRetryOperations(retryTemplate);
final MessageRecoverer messageRecoverer = getMessageRecoverer();
if (messageRecoverer != null) {
retryInterceptor.setRecoverer(new MethodInvocationRecoverer<Void>() {
public Void recover(Object[] args, Throwable cause) {
Message message = (Message) args[1];
messageRecoverer.recover(message);
return null;
}
});
}
return retryInterceptor;
}
public Class<?> getObjectType() {
return RetryOperationsInterceptor.class;
}
public boolean isSingleton() {
return true;
}
}

View File

@@ -345,6 +345,7 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations {
// try to send to configured routing key
routingKey = this.routingKey;
}
channel.basicPublish(exchange, routingKey, false, false, RabbitUtils.extractBasicProperties(message, encoding),
message.getBody());
// Check commit - avoid commit call within a JTA transaction.

View File

@@ -158,7 +158,7 @@ public class BlockingQueueConsumer {
}
} catch (IOException e) {
this.activeObjectCounter.release(this);
throw new ListenerStartupFatalException("Cannot prepare queue for listener. "
throw new FatalListenerStartupException("Cannot prepare queue for listener. "
+ "Either the queue doesn't exist or the broker will not allow us to use it.", e);
}
try {

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2002-2006 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.listener;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
/**
* Exception to be thrown when the execution of a listener method failed unrecoverably.
*
* @author Dave Syer
* @see MessageListenerAdapter
*/
@SuppressWarnings("serial")
public class FatalListenerExecutionException extends AmqpException {
/**
* Constructor for ListenerExecutionFailedException.
* @param msg the detail message
* @param cause the exception thrown by the listener method
*/
public FatalListenerExecutionException(String msg, Throwable cause) {
super(msg, cause);
}
/**
* Constructor for ListenerExecutionFailedException.
* @param msg the detail message
*/
public FatalListenerExecutionException(String msg) {
super(msg);
}
}

View File

@@ -1,17 +1,14 @@
/*
* Copyright 2002-2006 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.listener;
@@ -19,23 +16,21 @@ package org.springframework.amqp.rabbit.listener;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
/**
* Exception to be thrown when the execution of a listener method failed.
*
* @author Juergen Hoeller
* @since 2.0
* Exception to be thrown when the execution of a listener method failed on startup.
*
* @author Dave Syer
* @see MessageListenerAdapter
*/
@SuppressWarnings("serial")
public class ListenerStartupFatalException extends AmqpException {
public class FatalListenerStartupException extends AmqpException {
/**
* Constructor for ListenerExecutionFailedException.
* @param msg the detail message
* @param cause the exception thrown by the listener method
*/
public ListenerStartupFatalException(String msg, Throwable cause) {
public FatalListenerStartupException(String msg, Throwable cause) {
super(msg, cause);
}

View File

@@ -303,7 +303,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
this.taskExecutor.execute(processor);
}
for (AsyncMessageProcessingConsumer processor : processors) {
ListenerStartupFatalException startupException = processor.getStartupException();
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
@@ -429,7 +429,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
private final CountDownLatch start;
private volatile ListenerStartupFatalException startupException;
private volatile FatalListenerStartupException startupException;
public AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
this.consumer = consumer;
@@ -444,7 +444,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
* @throws TimeoutException if the consumer hasn't started
* @throws InterruptedException if the consumer startup is interrupted
*/
public ListenerStartupFatalException getStartupException() throws TimeoutException, InterruptedException {
public FatalListenerStartupException getStartupException() throws TimeoutException, InterruptedException {
if (!start.await(60000L, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out waiting for startup");
}
@@ -459,7 +459,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
try {
consumer.start();
} catch (ListenerStartupFatalException ex) {
} catch (FatalListenerStartupException ex) {
throw ex;
} catch (Throwable t) {
handleStartupFailure(t);
@@ -483,11 +483,15 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
} catch (ListenerStartupFatalException ex) {
} catch (FatalListenerStartupException ex) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
} catch (FatalListenerExecutionException ex) {
logger.error("Consumer received fatal exception during processing", ex);
// Fatal, but no point re-throwing, so just abort.
aborted = true;
} catch (Throwable t) {
if (logger.isDebugEnabled()) {
logger.warn(

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.retry;
import org.springframework.amqp.core.Message;
/**
* @author Dave Syer
*
*/
public interface MessageKeyGenerator {
/**
* Generate a unique key for the message that is repeatable on redelivery. Implementations should be very careful
* about assuming uniqueness of any element of the message, especially considering the requirement that it be
* repeatable. A message id is ideal, but may not be present (AMQP does not mandate it), and the message body is a
* byte array whose contents might be repeatable, but its object value is not.
*
* @param message the message to generate a key for
* @return a unique key for this message
*/
Object getKey(Message message);
}

View File

@@ -0,0 +1,30 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.retry;
import org.springframework.amqp.core.Message;
/**
* @author Dave Syer
*
*/
public interface MessageRecoverer {
/**
* Callback for message that was consumed but failed all retry attempts.
*
* @param message the message to recover
*/
void recover(Message message);
}

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.retry;
import org.springframework.amqp.core.Message;
/**
* An optimization for stateful retry of message processing. If a message is known to be "new", i.e. never consumed
* before by this or any other client, then there are potential optimizations for managing the state associated with
* tracking the processing of a message (e.g. there is no need to check a cache for a hit).
*
* @author Dave Syer
*
*/
public interface NewMessageIdentifier {
/**
* Query a message to see if it has been seen before. Usually it is only possible to know if it has definitely not
* been seen before (e.g. through the redelivered flag, which would be used by default). Clients can customize the
* retry behaviour for failed messages by implementing this method.
*
* @param message the message to test
* @return true if the message is known to not have been consumed before
*/
boolean isNew(Message message);
}

View File

@@ -4,9 +4,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -16,21 +15,23 @@ import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Level;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.amqp.AmqpException;
import org.junit.rules.ExpectedException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.AbstractRetryOperationsInterceptorFactoryBean;
import org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean;
import org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.test.BrokerRunning;
import org.springframework.amqp.rabbit.test.BrokerTestUtils;
import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster;
import org.springframework.retry.interceptor.MethodArgumentsKeyGenerator;
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
import org.springframework.retry.interceptor.NewMethodArgumentsIdentifier;
import org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.retry.policy.MapRetryContextCache;
import org.springframework.retry.support.RetryTemplate;
public class MessageListenerContainerRetryIntegrationTests {
@@ -46,12 +47,27 @@ public class MessageListenerContainerRetryIntegrationTests {
public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class,
SimpleMessageListenerContainer.class, BlockingQueueConsumer.class);
@Rule
public ExpectedException exception = ExpectedException.none();
private RabbitTemplate template;
private RetryTemplate retryTemplate;
private MessageConverter messageConverter;
private RabbitTemplate createTemplate(int concurrentConsumers) {
RabbitTemplate template = new RabbitTemplate();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setChannelCacheSize(concurrentConsumers);
connectionFactory.setPort(BrokerTestUtils.getPort());
template.setConnectionFactory(connectionFactory);
if (messageConverter == null) {
SimpleMessageConverter messageConverter = new SimpleMessageConverter();
messageConverter.setCreateMessageIds(true);
this.messageConverter = messageConverter;
}
template.setMessageConverter(messageConverter);
return template;
}
@@ -66,6 +82,37 @@ public class MessageListenerContainerRetryIntegrationTests {
}
@Test
public void testStatelessRetryWithAllMessagesFailing() throws Exception {
int messageCount = 10;
int txSize = 1;
int failFrequency = 1;
int concurrentConsumers = 3;
doTestStatelessRetry(messageCount, txSize, failFrequency, concurrentConsumers);
}
@Test
public void testStatefulRetryWithNoMessageIds() throws Exception {
int messageCount = 2;
int txSize = 1;
int failFrequency = 1;
int concurrentConsumers = 1;
SimpleMessageConverter messageConverter = new SimpleMessageConverter();
// There will be no key for these messages so they cannot be recovered...
messageConverter.setCreateMessageIds(false);
this.messageConverter = messageConverter;
// Beware of context cache busting if retry policy fails...
this.retryTemplate = new RetryTemplate();
this.retryTemplate.setRetryContextCache(new MapRetryContextCache(1));
// The container should have shutdown, so there are now no active consumers
exception.expectMessage("expected:<1> but was:<0>");
doTestStatefulRetry(messageCount, txSize, failFrequency, concurrentConsumers);
}
@Test
public void testStatefulRetryWithTxSizeAndIntermittentFailure() throws Exception {
@@ -88,22 +135,49 @@ public class MessageListenerContainerRetryIntegrationTests {
}
public void doTestStatefulRetry(int messageCount, int txSize, int failFrequency, int concurrentConsumers) throws Exception {
private Advice createRetryInterceptor(final CountDownLatch latch, boolean stateful) throws Exception {
AbstractRetryOperationsInterceptorFactoryBean factory;
if (stateful) {
factory = new StatefulRetryOperationsInterceptorFactoryBean();
} else {
factory = new StatelessRetryOperationsInterceptorFactoryBean();
}
factory.setMessageRecoverer(new MessageRecoverer() {
public void recover(Message message) {
logger.info("Recovered: " + message);
latch.countDown();
}
});
if (retryTemplate == null) {
retryTemplate = new RetryTemplate();
}
factory.setRetryOperations(retryTemplate);
Advice retryInterceptor = factory.getObject();
return retryInterceptor;
}
private void doTestStatefulRetry(int messageCount, int txSize, int failFrequency, int concurrentConsumers)
throws Exception {
doTestRetry(messageCount, txSize, failFrequency, concurrentConsumers, true);
}
private void doTestStatelessRetry(int messageCount, int txSize, int failFrequency, int concurrentConsumers)
throws Exception {
doTestRetry(messageCount, txSize, failFrequency, concurrentConsumers, false);
}
private void doTestRetry(int messageCount, int txSize, int failFrequency, int concurrentConsumers, boolean stateful)
throws Exception {
int failedMessageCount = messageCount / failFrequency + (messageCount % failFrequency == 0 ? 0 : 1);
RabbitTemplate template = createTemplate(concurrentConsumers);
template = createTemplate(concurrentConsumers);
for (int i = 0; i < messageCount; i++) {
template.convertAndSend(queue.getName(), new Integer(i), new MessagePostProcessor() {
// There is no message id by default
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
}
});
template.convertAndSend(queue.getName(), new Integer(i));
}
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory());
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
template.getConnectionFactory());
PojoListener listener = new PojoListener(failFrequency);
container.setMessageListener(new MessageListenerAdapter(listener));
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
@@ -111,30 +185,8 @@ public class MessageListenerContainerRetryIntegrationTests {
container.setTxSize(txSize);
container.setConcurrentConsumers(concurrentConsumers);
StatefulRetryOperationsInterceptor retryInterceptor = new StatefulRetryOperationsInterceptor();
retryInterceptor.setRetryOperations(new RetryTemplate());
retryInterceptor.setNewItemIdentifier(new NewMethodArgumentsIdentifier() {
public boolean isNew(Object[] args) {
Message message = (Message) args[1];
return !message.getMessageProperties().isRedelivered();
}
});
final CountDownLatch latch = new CountDownLatch(failedMessageCount);
retryInterceptor.setRecoverer(new MethodInvocationRecoverer<Object>() {
public Object recover(Object[] args, Throwable cause) {
logger.info("Recovered: " + Arrays.asList(args));
latch.countDown();
return null;
}
});
retryInterceptor.setKeyGenerator(new MethodArgumentsKeyGenerator() {
public Object getKey(Object[] args) {
Message message = (Message) args[1];
logger.info("Key: " + message.getMessageProperties().getMessageId());
return message.getMessageProperties().getMessageId();
}
});
container.setAdviceChain(new Advice[] { retryInterceptor });
container.setAdviceChain(new Advice[] { createRetryInterceptor(latch, stateful) });
container.setQueueNames(queue.getName());
container.afterPropertiesSet();
@@ -144,14 +196,29 @@ public class MessageListenerContainerRetryIntegrationTests {
int timeout = Math.min(1 + messageCount / concurrentConsumers, 30);
final int count = messageCount;
logger.debug("Waiting for messages with timeout = " + timeout + " (s)");
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
while (container.getActiveConsumerCount() > 0) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
latch.countDown();
Thread.currentThread().interrupt();
return;
}
}
for (int i = 0; i < count; i++) {
latch.countDown();
}
}
});
boolean waited = latch.await(timeout, TimeUnit.SECONDS);
logger.info("All messages recovered: " + waited);
assertEquals(concurrentConsumers, container.getActiveConsumerCount());
assertTrue("Timed out waiting for messages", waited);
assertEquals(concurrentConsumers, container.getActiveConsumerCount());
// Retried each failure 3 times (default retry policy)...
assertEquals(messageCount + 2 * failedMessageCount, listener.getCount());
@@ -174,7 +241,7 @@ public class MessageListenerContainerRetryIntegrationTests {
}
public void handleMessage(int value) throws Exception {
logger.debug(value + count.getAndIncrement());
logger.debug(value+ ":" + count.getAndIncrement());
if (value % failFrequency == 0) {
throw new RuntimeException("Planned");
}