diff --git a/spring-amqp-core/src/main/java/org/springframework/amqp/support/converter/SimpleMessageConverter.java b/spring-amqp-core/src/main/java/org/springframework/amqp/support/converter/SimpleMessageConverter.java index cb23b258..7a9ad851 100644 --- a/spring-amqp-core/src/main/java/org/springframework/amqp/support/converter/SimpleMessageConverter.java +++ b/spring-amqp-core/src/main/java/org/springframework/amqp/support/converter/SimpleMessageConverter.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; +import java.util.UUID; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; @@ -48,11 +49,22 @@ public class SimpleMessageConverter implements MessageConverter, BeanClassLoader private String codebaseUrl; private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader(); + + private boolean createMessageIds = false; public void setBeanClassLoader(ClassLoader beanClassLoader) { this.beanClassLoader = beanClassLoader; } + /** + * Flag to indicate that new messages should have unique identifiers added to their properties before sending. + * Default false. + * @param createMessageIds the flag value to set + */ + public void setCreateMessageIds(boolean createMessageIds) { + this.createMessageIds = createMessageIds; + } + /** * Set the codebase URL to download classes from if not found locally. Can consists of multiple URLs, separated by * spaces. @@ -146,6 +158,9 @@ public class SimpleMessageConverter implements MessageConverter, BeanClassLoader if (bytes != null) { messageProperties.setContentLength(bytes.length); } + if (this.createMessageIds && messageProperties.getMessageId()==null) { + messageProperties.setMessageId(UUID.randomUUID().toString()); + } return new Message(bytes, messageProperties); } diff --git a/spring-rabbit/pom.xml b/spring-rabbit/pom.xml index 59a53d96..99ca198d 100644 --- a/spring-rabbit/pom.xml +++ b/spring-rabbit/pom.xml @@ -41,9 +41,9 @@ spring-test - org.springframework + org.springframework.retry spring-retry - 1.0.0.RC1 + 1.0.0.BUILD-SNAPSHOT true diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRetryOperationsInterceptorFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRetryOperationsInterceptorFactoryBean.java new file mode 100644 index 00000000..0435ea33 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRetryOperationsInterceptorFactoryBean.java @@ -0,0 +1,52 @@ +/* + * Copyright 2002-2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.springframework.amqp.rabbit.config; + +import org.aopalliance.aop.Advice; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.retry.RetryOperations; + +/** + * Convenient base class for intercptor factories. + * + * @author Dave Syer + * + */ +public abstract class AbstractRetryOperationsInterceptorFactoryBean implements FactoryBean { + + private MessageRecoverer messageRecoverer; + + private RetryOperations retryTemplate; + + public void setRetryOperations(RetryOperations retryTemplate) { + this.retryTemplate = retryTemplate; + } + + public void setMessageRecoverer(MessageRecoverer messageRecoverer) { + this.messageRecoverer = messageRecoverer; + } + + protected RetryOperations getRetryOperations() { + return retryTemplate; + } + + protected MessageRecoverer getMessageRecoverer() { + return messageRecoverer; + } + + public boolean isSingleton() { + return true; + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java new file mode 100644 index 00000000..efed12c4 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java @@ -0,0 +1,123 @@ +/* + * Copyright 2002-2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.springframework.amqp.rabbit.config; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.listener.FatalListenerExecutionException; +import org.springframework.amqp.rabbit.retry.MessageKeyGenerator; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.amqp.rabbit.retry.NewMessageIdentifier; +import org.springframework.retry.RetryOperations; +import org.springframework.retry.interceptor.MethodArgumentsKeyGenerator; +import org.springframework.retry.interceptor.MethodInvocationRecoverer; +import org.springframework.retry.interceptor.NewMethodArgumentsIdentifier; +import org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor; +import org.springframework.retry.support.RetryTemplate; + +/** + * Convenient factory bean for creating a stateful retry interceptor for use in a message listener container, giving you + * a large amount of control over the behaviour of a container when a listener fails. To control the number of retry + * attempt or the backoff in between attempts, supply a customized {@link RetryTemplate}. Stateful retry is appropriate + * if your listener is using a transactional resource that needs to be rollback on an exception (e.g. a stateful + * connection to a back end server). JPA is the canonical example. The semantics of stateful retry mean that a listener + * exception is propagated to the container, so that it can force a rollback. When the message is redelivered it has to + * be recognised (hence the {@link MessageKeyGenerator} strategy), and when the retry attempts are exhausted it will be + * processed using a {@link MessageRecoverer} if one is provided, in a new transaction. If a recoverer is not provided + * the message will be logged and dropped. + * + * @see RetryOperations#execute(org.springframework.retry.RetryCallback, org.springframework.retry.RecoveryCallback, + * org.springframework.retry.RetryState) + * + * @author Dave Syer + * + */ +public class StatefulRetryOperationsInterceptorFactoryBean extends AbstractRetryOperationsInterceptorFactoryBean { + + private static Log logger = LogFactory.getLog(StatefulRetryOperationsInterceptorFactoryBean.class); + + private MessageKeyGenerator messageKeyGenerator; + + private NewMessageIdentifier newMessageIdentifier; + + public void setMessageKeyGeneretor(MessageKeyGenerator messageKeyGeneretor) { + this.messageKeyGenerator = messageKeyGeneretor; + } + + public void setNewMessageIdentifier(NewMessageIdentifier newMessageIdentifier) { + this.newMessageIdentifier = newMessageIdentifier; + } + + public StatefulRetryOperationsInterceptor getObject() { + + StatefulRetryOperationsInterceptor retryInterceptor = new StatefulRetryOperationsInterceptor(); + RetryOperations retryTemplate = getRetryOperations(); + if (retryTemplate == null) { + retryTemplate = new RetryTemplate(); + } + retryInterceptor.setRetryOperations(retryTemplate); + + retryInterceptor.setNewItemIdentifier(new NewMethodArgumentsIdentifier() { + public boolean isNew(Object[] args) { + Message message = (Message) args[1]; + if (newMessageIdentifier == null) { + return !message.getMessageProperties().isRedelivered(); + } else { + return newMessageIdentifier.isNew(message); + } + } + }); + + final MessageRecoverer messageRecoverer = getMessageRecoverer(); + retryInterceptor.setRecoverer(new MethodInvocationRecoverer() { + public Void recover(Object[] args, Throwable cause) { + Message message = (Message) args[1]; + if (messageRecoverer == null) { + logger.warn("Message dropped on recovery: " + message); + } else { + messageRecoverer.recover(message); + } + return null; + } + }); + + retryInterceptor.setKeyGenerator(new MethodArgumentsKeyGenerator() { + public Object getKey(Object[] args) { + Message message = (Message) args[1]; + if (messageKeyGenerator == null) { + String messageId = message.getMessageProperties().getMessageId(); + if (messageId == null) { + throw new FatalListenerExecutionException( + "Illegal null id in message. Failed to manage retry for message: " + message); + } + return messageId; + } else { + return messageKeyGenerator.getKey(message); + } + } + }); + + return retryInterceptor; + + } + + public Class getObjectType() { + return StatefulRetryOperationsInterceptor.class; + } + + public boolean isSingleton() { + return true; + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptorFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptorFactoryBean.java new file mode 100644 index 00000000..a2aa7890 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptorFactoryBean.java @@ -0,0 +1,71 @@ +/* + * Copyright 2002-2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.springframework.amqp.rabbit.config; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.retry.RetryOperations; +import org.springframework.retry.interceptor.MethodInvocationRecoverer; +import org.springframework.retry.interceptor.RetryOperationsInterceptor; +import org.springframework.retry.support.RetryTemplate; + +/** + * Convenient factory bean for creating a stateless retry interceptor for use in a message listener container, giving + * you a large amount of control over the behaviour of a container when a listener fails. To control the number of retry + * attempt or the backoff in between attempts, supply a customized {@link RetryTemplate}. Stateless retry is appropriate + * if your listener can be called repeatedly between failures with no side effects. The semantics of stateless retry + * mean that a listener exception is not propagated to the container until the retry attempts are exhausted. When the + * retry attempts are exhausted it can be processed using a {@link MessageRecoverer} if one is provided, in the same + * transaction (in which case no exception is propagated). If a recoverer is not provided the exception will be + * propagated and the message may be redelivered if the channel is transactional. + * + * @see RetryOperations#execute(org.springframework.retry.RetryCallback, org.springframework.retry.RecoveryCallback) + * + * @author Dave Syer + * + */ +public class StatelessRetryOperationsInterceptorFactoryBean extends AbstractRetryOperationsInterceptorFactoryBean { + + public RetryOperationsInterceptor getObject() { + + RetryOperationsInterceptor retryInterceptor = new RetryOperationsInterceptor(); + RetryOperations retryTemplate = getRetryOperations(); + if (retryTemplate == null) { + retryTemplate = new RetryTemplate(); + } + retryInterceptor.setRetryOperations(retryTemplate); + + final MessageRecoverer messageRecoverer = getMessageRecoverer(); + if (messageRecoverer != null) { + retryInterceptor.setRecoverer(new MethodInvocationRecoverer() { + public Void recover(Object[] args, Throwable cause) { + Message message = (Message) args[1]; + messageRecoverer.recover(message); + return null; + } + }); + } + + return retryInterceptor; + + } + + public Class getObjectType() { + return RetryOperationsInterceptor.class; + } + + public boolean isSingleton() { + return true; + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index e51be80f..288b5f66 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -345,6 +345,7 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations { // try to send to configured routing key routingKey = this.routingKey; } + channel.basicPublish(exchange, routingKey, false, false, RabbitUtils.extractBasicProperties(message, encoding), message.getBody()); // Check commit - avoid commit call within a JTA transaction. diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java index f2f2007a..d99bbf4f 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java @@ -158,7 +158,7 @@ public class BlockingQueueConsumer { } } catch (IOException e) { this.activeObjectCounter.release(this); - throw new ListenerStartupFatalException("Cannot prepare queue for listener. " + throw new FatalListenerStartupException("Cannot prepare queue for listener. " + "Either the queue doesn't exist or the broker will not allow us to use it.", e); } try { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/FatalListenerExecutionException.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/FatalListenerExecutionException.java new file mode 100644 index 00000000..356a1c1d --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/FatalListenerExecutionException.java @@ -0,0 +1,45 @@ +/* + * Copyright 2002-2006 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.springframework.amqp.rabbit.listener; + +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; + +/** + * Exception to be thrown when the execution of a listener method failed unrecoverably. + * + * @author Dave Syer + * @see MessageListenerAdapter + */ +@SuppressWarnings("serial") +public class FatalListenerExecutionException extends AmqpException { + + /** + * Constructor for ListenerExecutionFailedException. + * @param msg the detail message + * @param cause the exception thrown by the listener method + */ + public FatalListenerExecutionException(String msg, Throwable cause) { + super(msg, cause); + } + + /** + * Constructor for ListenerExecutionFailedException. + * @param msg the detail message + */ + public FatalListenerExecutionException(String msg) { + super(msg); + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ListenerStartupFatalException.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/FatalListenerStartupException.java similarity index 53% rename from spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ListenerStartupFatalException.java rename to spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/FatalListenerStartupException.java index dbd87892..7fded23d 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ListenerStartupFatalException.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/FatalListenerStartupException.java @@ -1,17 +1,14 @@ /* * Copyright 2002-2006 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.springframework.amqp.rabbit.listener; @@ -19,23 +16,21 @@ package org.springframework.amqp.rabbit.listener; import org.springframework.amqp.AmqpException; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; - /** - * Exception to be thrown when the execution of a listener method failed. - * - * @author Juergen Hoeller - * @since 2.0 + * Exception to be thrown when the execution of a listener method failed on startup. + * + * @author Dave Syer * @see MessageListenerAdapter */ @SuppressWarnings("serial") -public class ListenerStartupFatalException extends AmqpException { +public class FatalListenerStartupException extends AmqpException { /** * Constructor for ListenerExecutionFailedException. * @param msg the detail message * @param cause the exception thrown by the listener method */ - public ListenerStartupFatalException(String msg, Throwable cause) { + public FatalListenerStartupException(String msg, Throwable cause) { super(msg, cause); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index f9f583fe..6312a6cf 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -303,7 +303,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta this.taskExecutor.execute(processor); } for (AsyncMessageProcessingConsumer processor : processors) { - ListenerStartupFatalException startupException = processor.getStartupException(); + FatalListenerStartupException startupException = processor.getStartupException(); if (startupException != null) { throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException); } @@ -429,7 +429,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private final CountDownLatch start; - private volatile ListenerStartupFatalException startupException; + private volatile FatalListenerStartupException startupException; public AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) { this.consumer = consumer; @@ -444,7 +444,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta * @throws TimeoutException if the consumer hasn't started * @throws InterruptedException if the consumer startup is interrupted */ - public ListenerStartupFatalException getStartupException() throws TimeoutException, InterruptedException { + public FatalListenerStartupException getStartupException() throws TimeoutException, InterruptedException { if (!start.await(60000L, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for startup"); } @@ -459,7 +459,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta try { consumer.start(); - } catch (ListenerStartupFatalException ex) { + } catch (FatalListenerStartupException ex) { throw ex; } catch (Throwable t) { handleStartupFailure(t); @@ -483,11 +483,15 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta logger.debug("Consumer thread interrupted, processing stopped."); Thread.currentThread().interrupt(); aborted = true; - } catch (ListenerStartupFatalException ex) { + } catch (FatalListenerStartupException ex) { logger.error("Consumer received fatal exception on startup", ex); this.startupException = ex; // Fatal, but no point re-throwing, so just abort. aborted = true; + } catch (FatalListenerExecutionException ex) { + logger.error("Consumer received fatal exception during processing", ex); + // Fatal, but no point re-throwing, so just abort. + aborted = true; } catch (Throwable t) { if (logger.isDebugEnabled()) { logger.warn( diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageKeyGenerator.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageKeyGenerator.java new file mode 100644 index 00000000..dbad4ec4 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageKeyGenerator.java @@ -0,0 +1,34 @@ +/* + * Copyright 2002-2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.springframework.amqp.rabbit.retry; + +import org.springframework.amqp.core.Message; + +/** + * @author Dave Syer + * + */ +public interface MessageKeyGenerator { + + /** + * Generate a unique key for the message that is repeatable on redelivery. Implementations should be very careful + * about assuming uniqueness of any element of the message, especially considering the requirement that it be + * repeatable. A message id is ideal, but may not be present (AMQP does not mandate it), and the message body is a + * byte array whose contents might be repeatable, but its object value is not. + * + * @param message the message to generate a key for + * @return a unique key for this message + */ + Object getKey(Message message); + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageRecoverer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageRecoverer.java new file mode 100644 index 00000000..672307b8 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageRecoverer.java @@ -0,0 +1,30 @@ +/* + * Copyright 2002-2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.springframework.amqp.rabbit.retry; + +import org.springframework.amqp.core.Message; + +/** + * @author Dave Syer + * + */ +public interface MessageRecoverer { + + /** + * Callback for message that was consumed but failed all retry attempts. + * + * @param message the message to recover + */ + void recover(Message message); + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/NewMessageIdentifier.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/NewMessageIdentifier.java new file mode 100644 index 00000000..b3f968ac --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/NewMessageIdentifier.java @@ -0,0 +1,37 @@ +/* + * Copyright 2002-2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.springframework.amqp.rabbit.retry; + +import org.springframework.amqp.core.Message; + +/** + * An optimization for stateful retry of message processing. If a message is known to be "new", i.e. never consumed + * before by this or any other client, then there are potential optimizations for managing the state associated with + * tracking the processing of a message (e.g. there is no need to check a cache for a hit). + * + * @author Dave Syer + * + */ +public interface NewMessageIdentifier { + + /** + * Query a message to see if it has been seen before. Usually it is only possible to know if it has definitely not + * been seen before (e.g. through the redelivered flag, which would be used by default). Clients can customize the + * retry behaviour for failed messages by implementing this method. + * + * @param message the message to test + * @return true if the message is known to not have been consumed before + */ + boolean isNew(Message message); + +} diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java index 8f2316f9..171ef66a 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java @@ -4,9 +4,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.Arrays; -import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -16,21 +15,23 @@ import org.apache.commons.logging.LogFactory; import org.apache.log4j.Level; import org.junit.Rule; import org.junit.Test; -import org.springframework.amqp.AmqpException; +import org.junit.rules.ExpectedException; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; -import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.config.AbstractRetryOperationsInterceptorFactoryBean; +import org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean; +import org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.test.BrokerRunning; import org.springframework.amqp.rabbit.test.BrokerTestUtils; import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster; -import org.springframework.retry.interceptor.MethodArgumentsKeyGenerator; -import org.springframework.retry.interceptor.MethodInvocationRecoverer; -import org.springframework.retry.interceptor.NewMethodArgumentsIdentifier; -import org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.amqp.support.converter.SimpleMessageConverter; +import org.springframework.retry.policy.MapRetryContextCache; import org.springframework.retry.support.RetryTemplate; public class MessageListenerContainerRetryIntegrationTests { @@ -46,12 +47,27 @@ public class MessageListenerContainerRetryIntegrationTests { public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class, SimpleMessageListenerContainer.class, BlockingQueueConsumer.class); + @Rule + public ExpectedException exception = ExpectedException.none(); + + private RabbitTemplate template; + + private RetryTemplate retryTemplate; + + private MessageConverter messageConverter; + private RabbitTemplate createTemplate(int concurrentConsumers) { RabbitTemplate template = new RabbitTemplate(); CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setChannelCacheSize(concurrentConsumers); connectionFactory.setPort(BrokerTestUtils.getPort()); template.setConnectionFactory(connectionFactory); + if (messageConverter == null) { + SimpleMessageConverter messageConverter = new SimpleMessageConverter(); + messageConverter.setCreateMessageIds(true); + this.messageConverter = messageConverter; + } + template.setMessageConverter(messageConverter); return template; } @@ -66,6 +82,37 @@ public class MessageListenerContainerRetryIntegrationTests { } + @Test + public void testStatelessRetryWithAllMessagesFailing() throws Exception { + + int messageCount = 10; + int txSize = 1; + int failFrequency = 1; + int concurrentConsumers = 3; + doTestStatelessRetry(messageCount, txSize, failFrequency, concurrentConsumers); + + } + + @Test + public void testStatefulRetryWithNoMessageIds() throws Exception { + + int messageCount = 2; + int txSize = 1; + int failFrequency = 1; + int concurrentConsumers = 1; + SimpleMessageConverter messageConverter = new SimpleMessageConverter(); + // There will be no key for these messages so they cannot be recovered... + messageConverter.setCreateMessageIds(false); + this.messageConverter = messageConverter; + // Beware of context cache busting if retry policy fails... + this.retryTemplate = new RetryTemplate(); + this.retryTemplate.setRetryContextCache(new MapRetryContextCache(1)); + // The container should have shutdown, so there are now no active consumers + exception.expectMessage("expected:<1> but was:<0>"); + doTestStatefulRetry(messageCount, txSize, failFrequency, concurrentConsumers); + + } + @Test public void testStatefulRetryWithTxSizeAndIntermittentFailure() throws Exception { @@ -88,22 +135,49 @@ public class MessageListenerContainerRetryIntegrationTests { } - public void doTestStatefulRetry(int messageCount, int txSize, int failFrequency, int concurrentConsumers) throws Exception { + private Advice createRetryInterceptor(final CountDownLatch latch, boolean stateful) throws Exception { + AbstractRetryOperationsInterceptorFactoryBean factory; + if (stateful) { + factory = new StatefulRetryOperationsInterceptorFactoryBean(); + } else { + factory = new StatelessRetryOperationsInterceptorFactoryBean(); + } + factory.setMessageRecoverer(new MessageRecoverer() { + public void recover(Message message) { + logger.info("Recovered: " + message); + latch.countDown(); + } + }); + if (retryTemplate == null) { + retryTemplate = new RetryTemplate(); + } + factory.setRetryOperations(retryTemplate); + Advice retryInterceptor = factory.getObject(); + return retryInterceptor; + } + + private void doTestStatefulRetry(int messageCount, int txSize, int failFrequency, int concurrentConsumers) + throws Exception { + doTestRetry(messageCount, txSize, failFrequency, concurrentConsumers, true); + } + + private void doTestStatelessRetry(int messageCount, int txSize, int failFrequency, int concurrentConsumers) + throws Exception { + doTestRetry(messageCount, txSize, failFrequency, concurrentConsumers, false); + } + + private void doTestRetry(int messageCount, int txSize, int failFrequency, int concurrentConsumers, boolean stateful) + throws Exception { int failedMessageCount = messageCount / failFrequency + (messageCount % failFrequency == 0 ? 0 : 1); - RabbitTemplate template = createTemplate(concurrentConsumers); + template = createTemplate(concurrentConsumers); for (int i = 0; i < messageCount; i++) { - template.convertAndSend(queue.getName(), new Integer(i), new MessagePostProcessor() { - // There is no message id by default - public Message postProcessMessage(Message message) throws AmqpException { - message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); - return message; - } - }); + template.convertAndSend(queue.getName(), new Integer(i)); } - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory()); + final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer( + template.getConnectionFactory()); PojoListener listener = new PojoListener(failFrequency); container.setMessageListener(new MessageListenerAdapter(listener)); container.setAcknowledgeMode(AcknowledgeMode.AUTO); @@ -111,30 +185,8 @@ public class MessageListenerContainerRetryIntegrationTests { container.setTxSize(txSize); container.setConcurrentConsumers(concurrentConsumers); - StatefulRetryOperationsInterceptor retryInterceptor = new StatefulRetryOperationsInterceptor(); - retryInterceptor.setRetryOperations(new RetryTemplate()); - retryInterceptor.setNewItemIdentifier(new NewMethodArgumentsIdentifier() { - public boolean isNew(Object[] args) { - Message message = (Message) args[1]; - return !message.getMessageProperties().isRedelivered(); - } - }); final CountDownLatch latch = new CountDownLatch(failedMessageCount); - retryInterceptor.setRecoverer(new MethodInvocationRecoverer() { - public Object recover(Object[] args, Throwable cause) { - logger.info("Recovered: " + Arrays.asList(args)); - latch.countDown(); - return null; - } - }); - retryInterceptor.setKeyGenerator(new MethodArgumentsKeyGenerator() { - public Object getKey(Object[] args) { - Message message = (Message) args[1]; - logger.info("Key: " + message.getMessageProperties().getMessageId()); - return message.getMessageProperties().getMessageId(); - } - }); - container.setAdviceChain(new Advice[] { retryInterceptor }); + container.setAdviceChain(new Advice[] { createRetryInterceptor(latch, stateful) }); container.setQueueNames(queue.getName()); container.afterPropertiesSet(); @@ -144,14 +196,29 @@ public class MessageListenerContainerRetryIntegrationTests { int timeout = Math.min(1 + messageCount / concurrentConsumers, 30); + final int count = messageCount; logger.debug("Waiting for messages with timeout = " + timeout + " (s)"); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + while (container.getActiveConsumerCount() > 0) { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + latch.countDown(); + Thread.currentThread().interrupt(); + return; + } + } + for (int i = 0; i < count; i++) { + latch.countDown(); + } + } + }); boolean waited = latch.await(timeout, TimeUnit.SECONDS); logger.info("All messages recovered: " + waited); assertEquals(concurrentConsumers, container.getActiveConsumerCount()); assertTrue("Timed out waiting for messages", waited); - assertEquals(concurrentConsumers, container.getActiveConsumerCount()); - // Retried each failure 3 times (default retry policy)... assertEquals(messageCount + 2 * failedMessageCount, listener.getCount()); @@ -174,7 +241,7 @@ public class MessageListenerContainerRetryIntegrationTests { } public void handleMessage(int value) throws Exception { - logger.debug(value + count.getAndIncrement()); + logger.debug(value+ ":" + count.getAndIncrement()); if (value % failFrequency == 0) { throw new RuntimeException("Planned"); }