Commit 064faf95 authored by Stephane Nicoll's avatar Stephane Nicoll

Merge branch '1.5.x'

parents c14a5506 aa494681
/* /*
* Copyright 2012-2016 the original author or authors. * Copyright 2012-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit; ...@@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
...@@ -40,11 +41,15 @@ class RabbitAnnotationDrivenConfiguration { ...@@ -40,11 +41,15 @@ class RabbitAnnotationDrivenConfiguration {
private final ObjectProvider<MessageConverter> messageConverter; private final ObjectProvider<MessageConverter> messageConverter;
private final ObjectProvider<MessageRecoverer> messageRecoverer;
private final RabbitProperties properties; private final RabbitProperties properties;
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter, RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<MessageRecoverer> messageRecoverer,
RabbitProperties properties) { RabbitProperties properties) {
this.messageConverter = messageConverter; this.messageConverter = messageConverter;
this.messageRecoverer = messageRecoverer;
this.properties = properties; this.properties = properties;
} }
...@@ -53,6 +58,7 @@ class RabbitAnnotationDrivenConfiguration { ...@@ -53,6 +58,7 @@ class RabbitAnnotationDrivenConfiguration {
public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer(); SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
configurer.setMessageConverter(this.messageConverter.getIfUnique()); configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRabbitProperties(this.properties); configurer.setRabbitProperties(this.properties);
return configurer; return configurer;
} }
......
/* /*
* Copyright 2012-2016 the original author or authors. * Copyright 2012-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; ...@@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry; import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry;
...@@ -36,6 +37,8 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer { ...@@ -36,6 +37,8 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer {
private MessageConverter messageConverter; private MessageConverter messageConverter;
private MessageRecoverer messageRecoverer;
private RabbitProperties rabbitProperties; private RabbitProperties rabbitProperties;
/** /**
...@@ -47,6 +50,14 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer { ...@@ -47,6 +50,14 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer {
this.messageConverter = messageConverter; this.messageConverter = messageConverter;
} }
/**
* Set the {@link MessageRecoverer} to use or {@code null} to rely on the default.
* @param messageRecoverer the {@link MessageRecoverer}
*/
void setMessageRecoverer(MessageRecoverer messageRecoverer) {
this.messageRecoverer = messageRecoverer;
}
/** /**
* Set the {@link RabbitProperties} to use. * Set the {@link RabbitProperties} to use.
* @param rabbitProperties the {@link RabbitProperties} * @param rabbitProperties the {@link RabbitProperties}
...@@ -101,7 +112,9 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer { ...@@ -101,7 +112,9 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer {
builder.maxAttempts(retryConfig.getMaxAttempts()); builder.maxAttempts(retryConfig.getMaxAttempts());
builder.backOffOptions(retryConfig.getInitialInterval(), builder.backOffOptions(retryConfig.getInitialInterval(),
retryConfig.getMultiplier(), retryConfig.getMaxInterval()); retryConfig.getMultiplier(), retryConfig.getMaxInterval());
builder.recoverer(new RejectAndDontRequeueRecoverer()); MessageRecoverer recoverer = (this.messageRecoverer != null
? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build()); factory.setAdviceChain(builder.build());
} }
......
/* /*
* Copyright 2012-2016 the original author or authors. * Copyright 2012-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -28,6 +28,7 @@ import org.junit.rules.ExpectedException; ...@@ -28,6 +28,7 @@ import org.junit.rules.ExpectedException;
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
...@@ -38,6 +39,7 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin; ...@@ -38,6 +39,7 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.support.ValueExpression; import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.DirectFieldAccessor;
...@@ -48,6 +50,7 @@ import org.springframework.context.annotation.Bean; ...@@ -48,6 +50,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate; import org.springframework.retry.support.RetryTemplate;
...@@ -291,7 +294,8 @@ public class RabbitAutoConfigurationTests { ...@@ -291,7 +294,8 @@ public class RabbitAutoConfigurationTests {
@Test @Test
public void testRabbitListenerContainerFactoryWithCustomSettings() { public void testRabbitListenerContainerFactoryWithCustomSettings() {
load(MessageConvertersConfiguration.class, load(new Class<?>[] { MessageConvertersConfiguration.class,
MessageRecoverersConfiguration.class },
"spring.rabbitmq.listener.retry.enabled:true", "spring.rabbitmq.listener.retry.enabled:true",
"spring.rabbitmq.listener.retry.maxAttempts:4", "spring.rabbitmq.listener.retry.maxAttempts:4",
"spring.rabbitmq.listener.retry.initialInterval:2000", "spring.rabbitmq.listener.retry.initialInterval:2000",
...@@ -325,6 +329,14 @@ public class RabbitAutoConfigurationTests { ...@@ -325,6 +329,14 @@ public class RabbitAutoConfigurationTests {
assertThat(adviceChain).isNotNull(); assertThat(adviceChain).isNotNull();
assertThat(adviceChain.length).isEqualTo(1); assertThat(adviceChain.length).isEqualTo(1);
dfa = new DirectFieldAccessor(adviceChain[0]); dfa = new DirectFieldAccessor(adviceChain[0]);
MessageRecoverer messageRecoverer = this.context.getBean("myMessageRecoverer",
MessageRecoverer.class);
MethodInvocationRecoverer mir = (MethodInvocationRecoverer) dfa
.getPropertyValue("recoverer");
Message message = mock(Message.class);
Exception ex = new Exception("test");
mir.recover(new Object[]{"foo", message}, ex);
verify(messageRecoverer).recover(message, ex);
RetryTemplate retryTemplate = (RetryTemplate) dfa RetryTemplate retryTemplate = (RetryTemplate) dfa
.getPropertyValue("retryOperations"); .getPropertyValue("retryOperations");
assertThat(retryTemplate).isNotNull(); assertThat(retryTemplate).isNotNull();
...@@ -400,17 +412,17 @@ public class RabbitAutoConfigurationTests { ...@@ -400,17 +412,17 @@ public class RabbitAutoConfigurationTests {
} }
private void load(Class<?> config, String... environment) { private void load(Class<?> config, String... environment) {
this.context = doLoad(new Class<?>[] { config }, environment); load(new Class<?>[] { config }, environment);
} }
private AnnotationConfigApplicationContext doLoad(Class<?>[] configs, private void load(Class<?>[] configs,
String... environment) { String... environment) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(); AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.register(configs); applicationContext.register(configs);
applicationContext.register(RabbitAutoConfiguration.class); applicationContext.register(RabbitAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(applicationContext, environment); EnvironmentTestUtils.addEnvironment(applicationContext, environment);
applicationContext.refresh(); applicationContext.refresh();
return applicationContext; this.context = applicationContext;
} }
@Configuration @Configuration
...@@ -484,6 +496,22 @@ public class RabbitAutoConfigurationTests { ...@@ -484,6 +496,22 @@ public class RabbitAutoConfigurationTests {
} }
@Configuration
protected static class MessageRecoverersConfiguration {
@Bean
@Primary
public MessageRecoverer myMessageRecoverer() {
return mock(MessageRecoverer.class);
}
@Bean
public MessageRecoverer anotherMessageRecoverer() {
return mock(MessageRecoverer.class);
}
}
@Configuration @Configuration
@EnableRabbit @EnableRabbit
protected static class EnableRabbitConfiguration { protected static class EnableRabbitConfiguration {
......
...@@ -4596,8 +4596,9 @@ the broker connection is lost. Retries are disabled by default. ...@@ -4596,8 +4596,9 @@ the broker connection is lost. Retries are disabled by default.
==== Receiving a message ==== Receiving a message
When the Rabbit infrastructure is present, any bean can be annotated with When the Rabbit infrastructure is present, any bean can be annotated with
`@RabbitListener` to create a listener endpoint. If no `RabbitListenerContainerFactory` `@RabbitListener` to create a listener endpoint. If no `RabbitListenerContainerFactory`
has been defined, a default one is configured automatically. If a `MessageConverter` has been defined, a default one is configured automatically. If a `MessageConverter` or
beans is defined, it is associated automatically to the default factory. `MessageRecoverer` beans are defined, they are associated automatically to the default
factory.
The following component creates a listener endpoint on the `someQueue` queue: The following component creates a listener endpoint on the `someQueue` queue:
...@@ -4660,9 +4661,11 @@ Then you can use in any `@RabbitListener`-annotated method as follows: ...@@ -4660,9 +4661,11 @@ Then you can use in any `@RabbitListener`-annotated method as follows:
} }
---- ----
You can enable retries to handle situations where your listener throws an exception. You can enable retries to handle situations where your listener throws an exception. By
When retries are exhausted, the message will be rejected and either dropped or routed to a default `RejectAndDontRequeueRecoverer` is used but you can define a `MessageRecoverer`
dead-letter exchange if the broker is configured so. Retries are disabled by default. of your own. When retries are exhausted, the message will be rejected and either dropped
or routed to a dead-letter exchange if the broker is configured so. Retries are disabled
by default.
IMPORTANT: If retries are not enabled and the listener throws an exception, by default the IMPORTANT: If retries are not enabled and the listener throws an exception, by default the
delivery will be retried indefinitely. You can modify this behavior in two ways; set the delivery will be retried indefinitely. You can modify this behavior in two ways; set the
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment