Commit 3aa247f1 authored by Gary Russell's avatar Gary Russell Committed by Stephane Nicoll

Add configuration options for RabbitMQ's batch listener config

See gh-23766
parent fb251041
/* /*
* Copyright 2012-2019 the original author or authors. * Copyright 2012-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -116,6 +116,7 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends ...@@ -116,6 +116,7 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
factory.setIdleEventInterval(configuration.getIdleEventInterval().toMillis()); factory.setIdleEventInterval(configuration.getIdleEventInterval().toMillis());
} }
factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal()); factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());
factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());
ListenerRetry retryConfig = configuration.getRetry(); ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) { if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
......
...@@ -662,6 +662,11 @@ public class RabbitProperties { ...@@ -662,6 +662,11 @@ public class RabbitProperties {
*/ */
private Duration idleEventInterval; private Duration idleEventInterval;
/**
* Whether to present batched messages (created by a BatchingRabbitTemplate) as discrete messages.
*/
private boolean deBatchingEnabled = true;
/** /**
* Optional properties for a retry interceptor. * Optional properties for a retry interceptor.
*/ */
...@@ -709,6 +714,14 @@ public class RabbitProperties { ...@@ -709,6 +714,14 @@ public class RabbitProperties {
public abstract boolean isMissingQueuesFatal(); public abstract boolean isMissingQueuesFatal();
public boolean isDeBatchingEnabled() {
return deBatchingEnabled;
}
public void setDeBatchingEnabled(boolean deBatchingEnabled) {
this.deBatchingEnabled = deBatchingEnabled;
}
public ListenerRetry getRetry() { public ListenerRetry getRetry() {
return this.retry; return this.retry;
} }
...@@ -743,6 +756,13 @@ public class RabbitProperties { ...@@ -743,6 +756,13 @@ public class RabbitProperties {
*/ */
private boolean missingQueuesFatal = true; private boolean missingQueuesFatal = true;
/**
* When true, the container will create a batch of messages based on the 'receiveTimeout' and 'batchSize'.
* Coerces 'deBatchingEnabled' to true to include the contents of a producer created batch in the batch as
* discrete records.
*/
private boolean consumerBatchEnabled;
public Integer getConcurrency() { public Integer getConcurrency() {
return this.concurrency; return this.concurrency;
} }
...@@ -776,6 +796,14 @@ public class RabbitProperties { ...@@ -776,6 +796,14 @@ public class RabbitProperties {
this.missingQueuesFatal = missingQueuesFatal; this.missingQueuesFatal = missingQueuesFatal;
} }
public boolean isConsumerBatchEnabled() {
return consumerBatchEnabled;
}
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
this.consumerBatchEnabled = consumerBatchEnabled;
}
} }
/** /**
......
/* /*
* Copyright 2012-2019 the original author or authors. * Copyright 2012-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -39,6 +39,10 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer ...@@ -39,6 +39,10 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers); map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers); map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize); map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled);
if (config.isConsumerBatchEnabled()) {
factory.setDeBatchingEnabled(true);
}
} }
} }
...@@ -34,6 +34,7 @@ import com.rabbitmq.client.impl.DefaultCredentialsProvider; ...@@ -34,6 +34,7 @@ import com.rabbitmq.client.impl.DefaultCredentialsProvider;
import org.aopalliance.aop.Advice; import org.aopalliance.aop.Advice;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
...@@ -75,8 +76,7 @@ import static org.mockito.ArgumentMatchers.anyString; ...@@ -75,8 +76,7 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.*;
import static org.mockito.Mockito.verify;
/** /**
* Tests for {@link RabbitAutoConfiguration}. * Tests for {@link RabbitAutoConfiguration}.
...@@ -538,15 +538,21 @@ class RabbitAutoConfigurationTests { ...@@ -538,15 +538,21 @@ class RabbitAutoConfigurationTests {
.withPropertyValues("spring.rabbitmq.listener.type:direct", .withPropertyValues("spring.rabbitmq.listener.type:direct",
"spring.rabbitmq.listener.simple.concurrency:5", "spring.rabbitmq.listener.simple.concurrency:5",
"spring.rabbitmq.listener.simple.maxConcurrency:10", "spring.rabbitmq.listener.simple.maxConcurrency:10",
"spring.rabbitmq.listener.simple.prefetch:40") "spring.rabbitmq.listener.simple.prefetch:40",
"spring.rabbitmq.listener.simple.consumer-batch-enabled:true",
"spring.rabbitmq.listener.simple.de-batching-enabled:false")
.run((context) -> { .run((context) -> {
SimpleRabbitListenerContainerFactoryConfigurer configurer = context SimpleRabbitListenerContainerFactoryConfigurer configurer = context
.getBean(SimpleRabbitListenerContainerFactoryConfigurer.class); .getBean(SimpleRabbitListenerContainerFactoryConfigurer.class);
SimpleRabbitListenerContainerFactory factory = mock(SimpleRabbitListenerContainerFactory.class); SimpleRabbitListenerContainerFactory factory = mock(SimpleRabbitListenerContainerFactory.class);
configurer.configure(factory, mock(ConnectionFactory.class)); configurer.configure(factory, mock(ConnectionFactory.class));
InOrder inOrder = inOrder(factory);
verify(factory).setConcurrentConsumers(5); verify(factory).setConcurrentConsumers(5);
verify(factory).setMaxConcurrentConsumers(10); verify(factory).setMaxConcurrentConsumers(10);
verify(factory).setPrefetchCount(40); verify(factory).setPrefetchCount(40);
verify(factory).setConsumerBatchEnabled(true);
inOrder.verify(factory).setDeBatchingEnabled(false);
inOrder.verify(factory).setDeBatchingEnabled(true);
}); });
} }
...@@ -555,7 +561,8 @@ class RabbitAutoConfigurationTests { ...@@ -555,7 +561,8 @@ class RabbitAutoConfigurationTests {
this.contextRunner.withUserConfiguration(TestConfiguration.class) this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.type:simple", .withPropertyValues("spring.rabbitmq.listener.type:simple",
"spring.rabbitmq.listener.direct.consumers-per-queue:5", "spring.rabbitmq.listener.direct.consumers-per-queue:5",
"spring.rabbitmq.listener.direct.prefetch:40") "spring.rabbitmq.listener.direct.prefetch:40",
"spring.rabbitmq.listener.direct.de-batching-enabled:false")
.run((context) -> { .run((context) -> {
DirectRabbitListenerContainerFactoryConfigurer configurer = context DirectRabbitListenerContainerFactoryConfigurer configurer = context
.getBean(DirectRabbitListenerContainerFactoryConfigurer.class); .getBean(DirectRabbitListenerContainerFactoryConfigurer.class);
...@@ -563,6 +570,7 @@ class RabbitAutoConfigurationTests { ...@@ -563,6 +570,7 @@ class RabbitAutoConfigurationTests {
configurer.configure(factory, mock(ConnectionFactory.class)); configurer.configure(factory, mock(ConnectionFactory.class));
verify(factory).setConsumersPerQueue(5); verify(factory).setConsumersPerQueue(5);
verify(factory).setPrefetchCount(40); verify(factory).setPrefetchCount(40);
verify(factory).setDeBatchingEnabled(false);
}); });
} }
......
...@@ -303,6 +303,8 @@ class RabbitPropertiesTests { ...@@ -303,6 +303,8 @@ class RabbitPropertiesTests {
RabbitProperties.SimpleContainer simple = this.properties.getListener().getSimple(); RabbitProperties.SimpleContainer simple = this.properties.getListener().getSimple();
assertThat(simple.isAutoStartup()).isEqualTo(container.isAutoStartup()); assertThat(simple.isAutoStartup()).isEqualTo(container.isAutoStartup());
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal()); assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal());
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", simple.isDeBatchingEnabled());
assertThat(container).hasFieldOrPropertyWithValue("consumerBatchEnabled", simple.isConsumerBatchEnabled());
} }
@Test @Test
...@@ -312,6 +314,7 @@ class RabbitPropertiesTests { ...@@ -312,6 +314,7 @@ class RabbitPropertiesTests {
RabbitProperties.DirectContainer direct = this.properties.getListener().getDirect(); RabbitProperties.DirectContainer direct = this.properties.getListener().getDirect();
assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup()); assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup());
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal()); assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal());
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", direct.isDeBatchingEnabled());
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment