Commit 520448cd authored by Stephane Nicoll's avatar Stephane Nicoll

Add support for publisher confirms/returns

Closes gh-3945
parent 9bfdad75
...@@ -118,6 +118,8 @@ public class RabbitAutoConfiguration { ...@@ -118,6 +118,8 @@ public class RabbitAutoConfiguration {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory( CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
factory.getObject()); factory.getObject());
connectionFactory.setAddresses(config.getAddresses()); connectionFactory.setAddresses(config.getAddresses());
connectionFactory.setPublisherConfirms(config.isPublisherConfirms());
connectionFactory.setPublisherReturns(config.isPublisherReturns());
if (config.getCache().getChannel().getSize() != null) { if (config.getCache().getChannel().getSize() != null) {
connectionFactory connectionFactory
.setChannelCacheSize(config.getCache().getChannel().getSize()); .setChannelCacheSize(config.getCache().getChannel().getSize());
...@@ -163,6 +165,7 @@ public class RabbitAutoConfiguration { ...@@ -163,6 +165,7 @@ public class RabbitAutoConfiguration {
if (messageConverter != null) { if (messageConverter != null) {
rabbitTemplate.setMessageConverter(messageConverter); rabbitTemplate.setMessageConverter(messageConverter);
} }
rabbitTemplate.setMandatory(determineMandatoryFlag());
RabbitProperties.Template templateProperties = this.properties.getTemplate(); RabbitProperties.Template templateProperties = this.properties.getTemplate();
RabbitProperties.Retry retryProperties = templateProperties.getRetry(); RabbitProperties.Retry retryProperties = templateProperties.getRetry();
if (retryProperties.isEnabled()) { if (retryProperties.isEnabled()) {
...@@ -177,6 +180,11 @@ public class RabbitAutoConfiguration { ...@@ -177,6 +180,11 @@ public class RabbitAutoConfiguration {
return rabbitTemplate; return rabbitTemplate;
} }
private boolean determineMandatoryFlag() {
Boolean flag = this.properties.getTemplate().getMandatory();
return (flag != null ? flag : this.properties.isPublisherReturns());
}
private RetryTemplate createRetryTemplate(RabbitProperties.Retry properties) { private RetryTemplate createRetryTemplate(RabbitProperties.Retry properties) {
RetryTemplate template = new RetryTemplate(); RetryTemplate template = new RetryTemplate();
SimpleRetryPolicy policy = new SimpleRetryPolicy(); SimpleRetryPolicy policy = new SimpleRetryPolicy();
......
...@@ -78,6 +78,16 @@ public class RabbitProperties { ...@@ -78,6 +78,16 @@ public class RabbitProperties {
*/ */
private Integer requestedHeartbeat; private Integer requestedHeartbeat;
/**
* Enable publisher confirms.
*/
private boolean publisherConfirms;
/**
* Enable publisher returns.
*/
private boolean publisherReturns;
/** /**
* Cache configuration. * Cache configuration.
*/ */
...@@ -196,6 +206,22 @@ public class RabbitProperties { ...@@ -196,6 +206,22 @@ public class RabbitProperties {
this.requestedHeartbeat = requestedHeartbeat; this.requestedHeartbeat = requestedHeartbeat;
} }
public boolean isPublisherConfirms() {
return this.publisherConfirms;
}
public void setPublisherConfirms(boolean publisherConfirms) {
this.publisherConfirms = publisherConfirms;
}
public boolean isPublisherReturns() {
return this.publisherReturns;
}
public void setPublisherReturns(boolean publisherReturns) {
this.publisherReturns = publisherReturns;
}
public Cache getCache() { public Cache getCache() {
return this.cache; return this.cache;
} }
...@@ -467,6 +493,12 @@ public class RabbitProperties { ...@@ -467,6 +493,12 @@ public class RabbitProperties {
@NestedConfigurationProperty @NestedConfigurationProperty
private final Retry retry = new Retry(); private final Retry retry = new Retry();
/**
* Enable mandatory messages. If a mandatory message cannot be routed to a queue
* by the server, it will return an unroutable message with a Return method.
*/
private Boolean mandatory;
/** /**
* Timeout for receive() operations. * Timeout for receive() operations.
*/ */
...@@ -481,6 +513,14 @@ public class RabbitProperties { ...@@ -481,6 +513,14 @@ public class RabbitProperties {
return this.retry; return this.retry;
} }
public Boolean getMandatory() {
return this.mandatory;
}
public void setMandatory(Boolean mandatory) {
this.mandatory = mandatory;
}
public Long getReceiveTimeout() { public Long getReceiveTimeout() {
return this.receiveTimeout; return this.receiveTimeout;
} }
......
...@@ -37,6 +37,7 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin; ...@@ -37,6 +37,7 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.NoSuchBeanDefinitionException;
...@@ -59,6 +60,7 @@ import static org.mockito.Mockito.verify; ...@@ -59,6 +60,7 @@ import static org.mockito.Mockito.verify;
* @author Greg Turnquist * @author Greg Turnquist
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Gary Russell * @author Gary Russell
* @author Stephane Nicoll
*/ */
public class RabbitAutoConfigurationTests { public class RabbitAutoConfigurationTests {
...@@ -82,11 +84,15 @@ public class RabbitAutoConfigurationTests { ...@@ -82,11 +84,15 @@ public class RabbitAutoConfigurationTests {
.getBean(RabbitMessagingTemplate.class); .getBean(RabbitMessagingTemplate.class);
CachingConnectionFactory connectionFactory = this.context CachingConnectionFactory connectionFactory = this.context
.getBean(CachingConnectionFactory.class); .getBean(CachingConnectionFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory);
RabbitAdmin amqpAdmin = this.context.getBean(RabbitAdmin.class); RabbitAdmin amqpAdmin = this.context.getBean(RabbitAdmin.class);
assertThat(rabbitTemplate.getConnectionFactory()).isEqualTo(connectionFactory); assertThat(rabbitTemplate.getConnectionFactory()).isEqualTo(connectionFactory);
assertThat(getMandatory(rabbitTemplate)).isFalse();
assertThat(messagingTemplate.getRabbitTemplate()).isEqualTo(rabbitTemplate); assertThat(messagingTemplate.getRabbitTemplate()).isEqualTo(rabbitTemplate);
assertThat(amqpAdmin).isNotNull(); assertThat(amqpAdmin).isNotNull();
assertThat(connectionFactory.getHost()).isEqualTo("localhost"); assertThat(connectionFactory.getHost()).isEqualTo("localhost");
assertThat(dfa.getPropertyValue("publisherConfirms")).isEqualTo(false);
assertThat(dfa.getPropertyValue("publisherReturns")).isEqualTo(false);
assertThat(this.context.containsBean("rabbitListenerContainerFactory")) assertThat(this.context.containsBean("rabbitListenerContainerFactory"))
.as("Listener container factory should be created by default").isTrue(); .as("Listener container factory should be created by default").isTrue();
} }
...@@ -135,6 +141,19 @@ public class RabbitAutoConfigurationTests { ...@@ -135,6 +141,19 @@ public class RabbitAutoConfigurationTests {
assertThat(connectionFactory.getVirtualHost()).isEqualTo("/"); assertThat(connectionFactory.getVirtualHost()).isEqualTo("/");
} }
@Test
public void testConnectionFactoryPublisherSettings() {
load(TestConfiguration.class, "spring.rabbitmq.publisher-confirms=true",
"spring.rabbitmq.publisher-returns=true");
CachingConnectionFactory connectionFactory = this.context
.getBean(CachingConnectionFactory.class);
RabbitTemplate rabbitTemplate = this.context.getBean(RabbitTemplate.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory);
assertThat(dfa.getPropertyValue("publisherConfirms")).isEqualTo(true);
assertThat(dfa.getPropertyValue("publisherReturns")).isEqualTo(true);
assertThat(getMandatory(rabbitTemplate)).isTrue();
}
@Test @Test
public void testRabbitTemplateMessageConverters() { public void testRabbitTemplateMessageConverters() {
load(MessageConvertersConfiguration.class); load(MessageConvertersConfiguration.class);
...@@ -172,6 +191,21 @@ public class RabbitAutoConfigurationTests { ...@@ -172,6 +191,21 @@ public class RabbitAutoConfigurationTests {
assertThat(backOffPolicy.getMaxInterval()).isEqualTo(5000); assertThat(backOffPolicy.getMaxInterval()).isEqualTo(5000);
} }
@Test
public void testRabbitTemplateMandatory() {
load(TestConfiguration.class, "spring.rabbitmq.template.mandatory:true");
RabbitTemplate rabbitTemplate = this.context.getBean(RabbitTemplate.class);
assertThat(getMandatory(rabbitTemplate)).isTrue();
}
@Test
public void testRabbitTemplateMandatoryDisabledEvenIfPublisherReturnsIsSet() {
load(TestConfiguration.class, "spring.rabbitmq.template.mandatory:false",
"spring.rabbitmq.publisher-returns=true");
RabbitTemplate rabbitTemplate = this.context.getBean(RabbitTemplate.class);
assertThat(getMandatory(rabbitTemplate)).isFalse();
}
@Test @Test
public void testConnectionFactoryBackOff() { public void testConnectionFactoryBackOff() {
load(TestConfiguration2.class); load(TestConfiguration2.class);
...@@ -348,6 +382,13 @@ public class RabbitAutoConfigurationTests { ...@@ -348,6 +382,13 @@ public class RabbitAutoConfigurationTests {
connectionFactory).getPropertyValue("rabbitConnectionFactory"); connectionFactory).getPropertyValue("rabbitConnectionFactory");
} }
@SuppressWarnings("unchecked")
private boolean getMandatory(RabbitTemplate rabbitTemplate) {
ValueExpression<Boolean> expression = (ValueExpression<Boolean>)
new DirectFieldAccessor(rabbitTemplate).getPropertyValue("mandatoryExpression");
return expression.getValue();
}
private void load(Class<?> config, String... environment) { private void load(Class<?> config, String... environment) {
this.context = doLoad(new Class<?>[] { config }, environment); this.context = doLoad(new Class<?>[] { config }, environment);
} }
......
...@@ -812,12 +812,15 @@ content into your application; rather pick only the properties that you need. ...@@ -812,12 +812,15 @@ content into your application; rather pick only the properties that you need.
spring.rabbitmq.listener.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count. spring.rabbitmq.listener.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count.
spring.rabbitmq.password= # Login to authenticate against the broker. spring.rabbitmq.password= # Login to authenticate against the broker.
spring.rabbitmq.port=5672 # RabbitMQ port. spring.rabbitmq.port=5672 # RabbitMQ port.
spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.
spring.rabbitmq.publisher-returns=false # Enable publisher returns.
spring.rabbitmq.requested-heartbeat= # Requested heartbeat timeout, in seconds; zero for none. spring.rabbitmq.requested-heartbeat= # Requested heartbeat timeout, in seconds; zero for none.
spring.rabbitmq.ssl.enabled=false # Enable SSL support. spring.rabbitmq.ssl.enabled=false # Enable SSL support.
spring.rabbitmq.ssl.key-store= # Path to the key store that holds the SSL certificate. spring.rabbitmq.ssl.key-store= # Path to the key store that holds the SSL certificate.
spring.rabbitmq.ssl.key-store-password= # Password used to access the key store. spring.rabbitmq.ssl.key-store-password= # Password used to access the key store.
spring.rabbitmq.ssl.trust-store= # Trust store that holds SSL certificates. spring.rabbitmq.ssl.trust-store= # Trust store that holds SSL certificates.
spring.rabbitmq.ssl.trust-store-password= # Password used to access the trust store. spring.rabbitmq.ssl.trust-store-password= # Password used to access the trust store.
spring.rabbitmq.template.mandatory=false # Enable mandatory messages.
spring.rabbitmq.template.receive-timeout=0 # Timeout for `receive()` methods. spring.rabbitmq.template.receive-timeout=0 # Timeout for `receive()` methods.
spring.rabbitmq.template.reply-timeout=5000 # Timeout for `sendAndReceive()` methods. spring.rabbitmq.template.reply-timeout=5000 # Timeout for `sendAndReceive()` methods.
spring.rabbitmq.template.retry.enabled= # Set to true to enable retries in the `RabbitTemplate`. spring.rabbitmq.template.retry.enabled= # Set to true to enable retries in the `RabbitTemplate`.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment