GH-9695: Replace AMQP tx-size with batch-size
Fixes: #9695
Issue link: https://github.com/spring-projects/spring-integration/issues/9695
The `txSize` in the `SimpleMessageListenerContainer` has been replaced with more reasonable `batchSize`.
Spring Integration XML support for AMQP module has missed to fix this change: we didn't have a respective test.
* Deprecate `tx-size` (will be removed in `6.5`) XML attribute for the `<amqp:inbound-channel-adapter>`
and introduce `batch-size`
* Cover with the tests
* Fix docs from `tx-size` to `batch-size`
(cherry picked from commit 2cf83b5311)
This commit is contained in:
committed by
Spring Builds
parent
857f703877
commit
f56f5cf76b
@@ -59,6 +59,7 @@ abstract class AbstractAmqpInboundAdapterParser extends AbstractSingleBeanDefini
|
||||
"receive-timeout",
|
||||
"shutdown-timeout",
|
||||
"tx-size",
|
||||
"batch-size",
|
||||
"missing-queues-fatal"
|
||||
};
|
||||
|
||||
@@ -154,7 +155,13 @@ abstract class AbstractAmqpInboundAdapterParser extends AbstractSingleBeanDefini
|
||||
}
|
||||
builder.addConstructorArgReference(connectionFactoryRef);
|
||||
for (String attributeName : CONTAINER_VALUE_ATTRIBUTES) {
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName);
|
||||
// TODO remove 'tx-size' in 6.5
|
||||
if ("tx-size".equals(attributeName)) {
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName, "batchSize");
|
||||
}
|
||||
else {
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName);
|
||||
}
|
||||
}
|
||||
for (String attributeName : CONTAINER_REFERENCE_ATTRIBUTES) {
|
||||
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, attributeName);
|
||||
|
||||
@@ -969,7 +969,7 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA
|
||||
<xsd:documentation>
|
||||
Acknowledge Mode for the MessageListenerContainer; default 'AUTO'
|
||||
meaning the adapter automatically acknowledges the message(s)
|
||||
according to the tx-size.
|
||||
according to the batch-size.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:simpleType>
|
||||
@@ -1039,7 +1039,7 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA
|
||||
<xsd:documentation>
|
||||
Specifies how many messages to send to each consumer in a single request. Often this can be set
|
||||
quite high
|
||||
to improve throughput. It should be greater than or equal to the tx-size value.
|
||||
to improve throughput. It should be greater than or equal to the batch-size value.
|
||||
</xsd:documentation>
|
||||
</xsd:appinfo>
|
||||
</xsd:annotation>
|
||||
@@ -1117,10 +1117,23 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA
|
||||
<xsd:annotation>
|
||||
<xsd:appinfo>
|
||||
<xsd:documentation>
|
||||
[DEPRECATED]
|
||||
How many messages to process in a single transaction (if the channel is transactional). For best
|
||||
results it should be
|
||||
less than or equal to the prefetch count.
|
||||
Not allowed when 'consumers-per-queue' is set.
|
||||
Deprecated in favor of 'batch-size'.
|
||||
</xsd:documentation>
|
||||
</xsd:appinfo>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="batch-size" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:appinfo>
|
||||
<xsd:documentation>
|
||||
How many messages to process in a single request.
|
||||
For best results it should be less than or equal to the prefetch count.
|
||||
Not allowed when 'consumers-per-queue' is set.
|
||||
</xsd:documentation>
|
||||
</xsd:appinfo>
|
||||
</xsd:annotation>
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
|
||||
xmlns:int="http://www.springframework.org/schema/integration"
|
||||
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
|
||||
xmlns:context="http://www.springframework.org/schema/context"
|
||||
xmlns:util="http://www.springframework.org/schema/util"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp https://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
|
||||
xmlns:int="http://www.springframework.org/schema/integration"
|
||||
xmlns:context="http://www.springframework.org/schema/context"
|
||||
xmlns:util="http://www.springframework.org/schema/util"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp https://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
|
||||
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
|
||||
http://www.springframework.org/schema/rabbit https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
|
||||
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
|
||||
http://www.springframework.org/schema/util https://www.springframework.org/schema/util/spring-util.xsd
|
||||
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
|
||||
@@ -20,32 +18,34 @@
|
||||
</util:properties>
|
||||
|
||||
<amqp:inbound-channel-adapter id="rabbitInbound" queue-names="inboundchanneladapter.test.1"
|
||||
batch-mode="EXTRACT_PAYLOADS"/>
|
||||
batch-mode="EXTRACT_PAYLOADS" tx-size="2"/>
|
||||
|
||||
<amqp:inbound-channel-adapter id="autoStartFalse" queue-names="inboundchanneladapter.test.2"
|
||||
auto-startup="false" phase="123" acknowledge-mode="${ackMode}" missing-queues-fatal="false" />
|
||||
auto-startup="false" phase="123" acknowledge-mode="${ackMode}"
|
||||
missing-queues-fatal="false"
|
||||
batch-size="3"/>
|
||||
|
||||
<amqp:inbound-channel-adapter id="withHeaderMapperStandardAndCustomHeaders"
|
||||
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
|
||||
auto-startup="false" phase="123"
|
||||
mapped-request-headers="foo*, STANDARD_REQUEST_HEADERS"/>
|
||||
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
|
||||
auto-startup="false" phase="123"
|
||||
mapped-request-headers="foo*, STANDARD_REQUEST_HEADERS"/>
|
||||
|
||||
<amqp:inbound-channel-adapter id="withHeaderMapperOnlyCustomHeaders"
|
||||
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
|
||||
auto-startup="false" phase="123"
|
||||
mapped-request-headers="foo*"/>
|
||||
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
|
||||
auto-startup="false" phase="123"
|
||||
mapped-request-headers="foo*"/>
|
||||
|
||||
<amqp:inbound-channel-adapter id="withHeaderMapperNothingToMap"
|
||||
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
|
||||
auto-startup="false" phase="123"
|
||||
mapped-request-headers=""/>
|
||||
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
|
||||
auto-startup="false" phase="123"
|
||||
mapped-request-headers=""/>
|
||||
|
||||
<amqp:inbound-channel-adapter id="withHeaderMapperDefaultMapping"
|
||||
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
|
||||
auto-startup="false" phase="123"/>
|
||||
channel="requestChannel" queue-names="inboundchanneladapter.test.2"
|
||||
auto-startup="false" phase="123"/>
|
||||
|
||||
<amqp:inbound-channel-adapter id="dmlc" queue-names="inboundchanneladapter.test.2" consumers-per-queue="2"
|
||||
auto-startup="false" />
|
||||
auto-startup="false"/>
|
||||
|
||||
<int:channel id="requestChannel">
|
||||
<int:queue/>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2022 the original author or authors.
|
||||
* Copyright 2002-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,8 +16,7 @@
|
||||
|
||||
package org.springframework.integration.amqp.config;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.amqp.core.AcknowledgeMode;
|
||||
import org.springframework.amqp.core.Message;
|
||||
@@ -37,10 +36,10 @@ import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
@@ -49,8 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
*
|
||||
* @since 2.1
|
||||
*/
|
||||
@ContextConfiguration
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringJUnitConfig
|
||||
@DirtiesContext
|
||||
public class AmqpInboundChannelAdapterParserTests {
|
||||
|
||||
@@ -71,6 +69,8 @@ public class AmqpInboundChannelAdapterParserTests {
|
||||
.isInstanceOf(SimpleMessageListenerContainer.class);
|
||||
assertThat(TestUtils.getPropertyValue(adapter, "batchMode", BatchMode.class))
|
||||
.isEqualTo(BatchMode.EXTRACT_PAYLOADS);
|
||||
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class))
|
||||
.isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -95,6 +95,8 @@ public class AmqpInboundChannelAdapterParserTests {
|
||||
.isEqualTo(AcknowledgeMode.NONE);
|
||||
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal", Boolean.class))
|
||||
.isFalse();
|
||||
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class))
|
||||
.isEqualTo(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -216,14 +218,13 @@ public class AmqpInboundChannelAdapterParserTests {
|
||||
|
||||
@Test
|
||||
public void testInt2971HeaderMapperAndMappedHeadersExclusivity() {
|
||||
try {
|
||||
new ClassPathXmlApplicationContext("AmqpInboundChannelAdapterParserTests-headerMapper-fail-context.xml",
|
||||
this.getClass()).close();
|
||||
}
|
||||
catch (BeanDefinitionParsingException e) {
|
||||
assertThat(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " +
|
||||
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'")).isTrue();
|
||||
}
|
||||
assertThatExceptionOfType(BeanDefinitionParsingException.class)
|
||||
.isThrownBy(() ->
|
||||
new ClassPathXmlApplicationContext(
|
||||
"AmqpInboundChannelAdapterParserTests-headerMapper-fail-context.xml",
|
||||
getClass()))
|
||||
.withMessageStartingWith("Configuration problem: The 'header-mapper' attribute " +
|
||||
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ The following adapters are available:
|
||||
* xref:amqp/inbound-gateway.adoc[Inbound Gateway]
|
||||
* xref:amqp/outbound-channel-adapter.adoc[Outbound Channel Adapter]
|
||||
* xref:amqp/outbound-gateway.adoc[Outbound Gateway]
|
||||
* xref:amqp-async-outbound-gateway[Async Outbound Gateway]
|
||||
* xref:amqp/async-outbound-gateway.adoc[Async Outbound Gateway]
|
||||
* xref:amqp/rmq-streams.adoc#rmq-stream-inbound-channel-adapter[RabbitMQ Stream Queue Inbound Channel Adapter]
|
||||
* xref:amqp/rmq-streams.adoc#rmq-stream-outbound-channel-adapter[RabbitMQ Stream Queue Outbound Channel Adapter]
|
||||
|
||||
|
||||
@@ -87,7 +87,7 @@ XML::
|
||||
task-executor="" <22>
|
||||
transaction-attribute="" <23>
|
||||
transaction-manager="" <24>
|
||||
tx-size="" <25>
|
||||
batch-size="" <25>
|
||||
consumers-per-queue <26>
|
||||
batch-mode="MESSAGES"/> <27>
|
||||
|
||||
@@ -146,7 +146,7 @@ By default, this value is `Integer.MAX_VALUE`, meaning that this container start
|
||||
Optional.
|
||||
<17> Tells the AMQP broker how many messages to send to each consumer in a single request.
|
||||
Often, you can set this value high to improve throughput.
|
||||
It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
|
||||
It should be greater than or equal to the transaction size (see the `batch-size` attribute, later in this list).
|
||||
Optional (defaults to `1`).
|
||||
<18> Receive timeout in milliseconds.
|
||||
Optional (defaults to `1000`).
|
||||
@@ -173,7 +173,7 @@ If the `channelTransacted` flag is `false`, no transaction semantics apply to th
|
||||
For further information, see
|
||||
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP].
|
||||
Optional.
|
||||
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
|
||||
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single request.
|
||||
For best results, it should be less than or equal to the value set in `prefetch-count`.
|
||||
Not allowed when 'consumers-per-queue' is set.
|
||||
Optional (defaults to `1`).
|
||||
|
||||
@@ -10,7 +10,7 @@ Java DSL::
|
||||
+
|
||||
[source, java, role="primary"]
|
||||
----
|
||||
@Bean // return the upper cased payload
|
||||
@Bean // return the upper-cased payload
|
||||
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
|
||||
return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo"))
|
||||
.transform(String.class, String::toUpperCase)
|
||||
|
||||
Reference in New Issue
Block a user