diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java
index 32833e8012..44a615b499 100644
--- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java
+++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java
@@ -59,6 +59,7 @@ abstract class AbstractAmqpInboundAdapterParser extends AbstractSingleBeanDefini
"receive-timeout",
"shutdown-timeout",
"tx-size",
+ "batch-size",
"missing-queues-fatal"
};
@@ -154,7 +155,13 @@ abstract class AbstractAmqpInboundAdapterParser extends AbstractSingleBeanDefini
}
builder.addConstructorArgReference(connectionFactoryRef);
for (String attributeName : CONTAINER_VALUE_ATTRIBUTES) {
- IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName);
+ // TODO remove 'tx-size' in 6.5
+ if ("tx-size".equals(attributeName)) {
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName, "batchSize");
+ }
+ else {
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName);
+ }
}
for (String attributeName : CONTAINER_REFERENCE_ATTRIBUTES) {
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, attributeName);
diff --git a/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd b/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd
index bf51462734..e6172835ef 100644
--- a/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd
+++ b/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd
@@ -969,7 +969,7 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA
Acknowledge Mode for the MessageListenerContainer; default 'AUTO'
meaning the adapter automatically acknowledges the message(s)
- according to the tx-size.
+ according to the batch-size.
@@ -1039,7 +1039,7 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA
Specifies how many messages to send to each consumer in a single request. Often this can be set
quite high
- to improve throughput. It should be greater than or equal to the tx-size value.
+ to improve throughput. It should be greater than or equal to the batch-size value.
@@ -1117,10 +1117,23 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA
+ [DEPRECATED]
How many messages to process in a single transaction (if the channel is transactional). For best
results it should be
less than or equal to the prefetch count.
Not allowed when 'consumers-per-queue' is set.
+ Deprecated in favor of 'batch-size'.
+
+
+
+
+
+
+
+
+ How many messages to process in a single request.
+ For best results it should be less than or equal to the prefetch count.
+ Not allowed when 'consumers-per-queue' is set.
diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml
index 8b105f34e6..85e4ee30bb 100644
--- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml
+++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml
@@ -1,14 +1,12 @@
@@ -20,32 +18,34 @@
+ batch-mode="EXTRACT_PAYLOADS" tx-size="2"/>
+ auto-startup="false" phase="123" acknowledge-mode="${ackMode}"
+ missing-queues-fatal="false"
+ batch-size="3"/>
+ channel="requestChannel" queue-names="inboundchanneladapter.test.2"
+ auto-startup="false" phase="123"
+ mapped-request-headers="foo*, STANDARD_REQUEST_HEADERS"/>
+ channel="requestChannel" queue-names="inboundchanneladapter.test.2"
+ auto-startup="false" phase="123"
+ mapped-request-headers="foo*"/>
+ channel="requestChannel" queue-names="inboundchanneladapter.test.2"
+ auto-startup="false" phase="123"
+ mapped-request-headers=""/>
+ channel="requestChannel" queue-names="inboundchanneladapter.test.2"
+ auto-startup="false" phase="123"/>
+ auto-startup="false"/>
diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java
index 1bc7a1703b..04eb584d5f 100644
--- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java
+++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2022 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,8 +16,7 @@
package org.springframework.integration.amqp.config;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
@@ -37,10 +36,10 @@ import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
/**
* @author Mark Fisher
@@ -49,8 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat;
*
* @since 2.1
*/
-@ContextConfiguration
-@RunWith(SpringJUnit4ClassRunner.class)
+@SpringJUnitConfig
@DirtiesContext
public class AmqpInboundChannelAdapterParserTests {
@@ -71,6 +69,8 @@ public class AmqpInboundChannelAdapterParserTests {
.isInstanceOf(SimpleMessageListenerContainer.class);
assertThat(TestUtils.getPropertyValue(adapter, "batchMode", BatchMode.class))
.isEqualTo(BatchMode.EXTRACT_PAYLOADS);
+ assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class))
+ .isEqualTo(2);
}
@Test
@@ -95,6 +95,8 @@ public class AmqpInboundChannelAdapterParserTests {
.isEqualTo(AcknowledgeMode.NONE);
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal", Boolean.class))
.isFalse();
+ assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class))
+ .isEqualTo(3);
}
@Test
@@ -216,14 +218,13 @@ public class AmqpInboundChannelAdapterParserTests {
@Test
public void testInt2971HeaderMapperAndMappedHeadersExclusivity() {
- try {
- new ClassPathXmlApplicationContext("AmqpInboundChannelAdapterParserTests-headerMapper-fail-context.xml",
- this.getClass()).close();
- }
- catch (BeanDefinitionParsingException e) {
- assertThat(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " +
- "is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'")).isTrue();
- }
+ assertThatExceptionOfType(BeanDefinitionParsingException.class)
+ .isThrownBy(() ->
+ new ClassPathXmlApplicationContext(
+ "AmqpInboundChannelAdapterParserTests-headerMapper-fail-context.xml",
+ getClass()))
+ .withMessageStartingWith("Configuration problem: The 'header-mapper' attribute " +
+ "is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'");
}
}
diff --git a/src/reference/antora/modules/ROOT/pages/amqp.adoc b/src/reference/antora/modules/ROOT/pages/amqp.adoc
index b6725a1706..827f4952ae 100644
--- a/src/reference/antora/modules/ROOT/pages/amqp.adoc
+++ b/src/reference/antora/modules/ROOT/pages/amqp.adoc
@@ -32,7 +32,7 @@ The following adapters are available:
* xref:amqp/inbound-gateway.adoc[Inbound Gateway]
* xref:amqp/outbound-channel-adapter.adoc[Outbound Channel Adapter]
* xref:amqp/outbound-gateway.adoc[Outbound Gateway]
-* xref:amqp-async-outbound-gateway[Async Outbound Gateway]
+* xref:amqp/async-outbound-gateway.adoc[Async Outbound Gateway]
* xref:amqp/rmq-streams.adoc#rmq-stream-inbound-channel-adapter[RabbitMQ Stream Queue Inbound Channel Adapter]
* xref:amqp/rmq-streams.adoc#rmq-stream-outbound-channel-adapter[RabbitMQ Stream Queue Outbound Channel Adapter]
diff --git a/src/reference/antora/modules/ROOT/pages/amqp/inbound-channel-adapter.adoc b/src/reference/antora/modules/ROOT/pages/amqp/inbound-channel-adapter.adoc
index e8ae8d91de..906bf071fa 100644
--- a/src/reference/antora/modules/ROOT/pages/amqp/inbound-channel-adapter.adoc
+++ b/src/reference/antora/modules/ROOT/pages/amqp/inbound-channel-adapter.adoc
@@ -87,7 +87,7 @@ XML::
task-executor="" <22>
transaction-attribute="" <23>
transaction-manager="" <24>
- tx-size="" <25>
+ batch-size="" <25>
consumers-per-queue <26>
batch-mode="MESSAGES"/> <27>
@@ -146,7 +146,7 @@ By default, this value is `Integer.MAX_VALUE`, meaning that this container start
Optional.
<17> Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
-It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
+It should be greater than or equal to the transaction size (see the `batch-size` attribute, later in this list).
Optional (defaults to `1`).
<18> Receive timeout in milliseconds.
Optional (defaults to `1000`).
@@ -173,7 +173,7 @@ If the `channelTransacted` flag is `false`, no transaction semantics apply to th
For further information, see
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP].
Optional.
-<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
+<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single request.
For best results, it should be less than or equal to the value set in `prefetch-count`.
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to `1`).
diff --git a/src/reference/antora/modules/ROOT/pages/amqp/inbound-gateway.adoc b/src/reference/antora/modules/ROOT/pages/amqp/inbound-gateway.adoc
index 7fcdf9617e..cc2762a88e 100644
--- a/src/reference/antora/modules/ROOT/pages/amqp/inbound-gateway.adoc
+++ b/src/reference/antora/modules/ROOT/pages/amqp/inbound-gateway.adoc
@@ -10,7 +10,7 @@ Java DSL::
+
[source, java, role="primary"]
----
-@Bean // return the upper cased payload
+@Bean // return the upper-cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo"))
.transform(String.class, String::toUpperCase)