AMQP-256 Add channel-transacted to SLC Namespace
Namespace support for SimpleListenerContainer was missing channel-transacted attribute. AMQP-256 Polishing Add cross check - disallow a transacted channel when acknowlege='NONE' (autoack in Rabbit-speak).
This commit is contained in:
committed by
Oleg Zhurakousky
parent
cc0f692d22
commit
0a6ba19b75
@@ -52,6 +52,8 @@ public class RabbitNamespaceUtils {
|
||||
|
||||
private static final String PREFETCH_ATTRIBUTE = "prefetch";
|
||||
|
||||
private static final String CHANNEL_TRANSACTED_ATTRIBUTE = "channel-transacted";
|
||||
|
||||
private static final String TRANSACTION_SIZE_ATTRIBUTE = "transaction-size";
|
||||
|
||||
private static final String PHASE_ATTRIBUTE = "phase";
|
||||
@@ -110,6 +112,16 @@ public class RabbitNamespaceUtils {
|
||||
containerDef.getPropertyValues().add("prefetchCount", new TypedStringValue(prefetch));
|
||||
}
|
||||
|
||||
String channelTransacted = containerEle.getAttribute(CHANNEL_TRANSACTED_ATTRIBUTE);
|
||||
if (StringUtils.hasText(channelTransacted)) {
|
||||
// Note: a placeholder will pass this test, but if it resolves to true, it will be caught during container initialization
|
||||
if (acknowledgeMode.isAutoAck() && channelTransacted.equalsIgnoreCase("true")) {
|
||||
parserContext.getReaderContext().error(
|
||||
"Listener Container - cannot set channel-transacted with acknowledge='NONE'", containerEle);
|
||||
}
|
||||
containerDef.getPropertyValues().add("channelTransacted", new TypedStringValue(channelTransacted));
|
||||
}
|
||||
|
||||
String transactionSize = containerEle.getAttribute(TRANSACTION_SIZE_ATTRIBUTE);
|
||||
if (StringUtils.hasText(transactionSize)) {
|
||||
containerDef.getPropertyValues().add("txSize", new TypedStringValue(transactionSize));
|
||||
|
||||
@@ -605,6 +605,14 @@
|
||||
</xsd:appinfo>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="channel-transacted" type="xsd:string" use="optional">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation><![CDATA[
|
||||
Flag to indicate that the channel should be used transactionally.
|
||||
Cannot be 'true' if acknowledge is 'none'. Default is false.
|
||||
]]></xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="concurrency" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation><![CDATA[
|
||||
|
||||
@@ -17,7 +17,10 @@
|
||||
package org.springframework.amqp.rabbit.config;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
@@ -34,8 +37,10 @@ import org.springframework.aop.MethodBeforeAdvice;
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.beans.factory.parsing.BeanDefinitionParsingException;
|
||||
import org.springframework.beans.factory.xml.XmlBeanFactory;
|
||||
import org.springframework.context.expression.StandardBeanExpressionResolver;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
@@ -93,6 +98,26 @@ public class ListenerContainerParserTests {
|
||||
SimpleMessageListenerContainer container = beanFactory.getBean("container5", SimpleMessageListenerContainer.class);
|
||||
assertEquals(1, ReflectionTestUtils.getField(container, "concurrentConsumers"));
|
||||
assertEquals(false, ReflectionTestUtils.getField(container, "defaultRequeueRejected"));
|
||||
assertFalse(container.isChannelTransacted());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseWithTx() throws Exception {
|
||||
SimpleMessageListenerContainer container = beanFactory.getBean("container6", SimpleMessageListenerContainer.class);
|
||||
assertTrue(container.isChannelTransacted());
|
||||
assertEquals(5, ReflectionTestUtils.getField(container, "txSize"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncompatibleTxAtts() {
|
||||
try {
|
||||
new ClassPathXmlApplicationContext(getClass().getSimpleName() + "-fail-context.xml", getClass());
|
||||
fail("Parse exception exptected");
|
||||
}
|
||||
catch (BeanDefinitionParsingException e) {
|
||||
assertTrue(e.getMessage().startsWith(
|
||||
"Configuration problem: Listener Container - cannot set channel-transacted with acknowledge='NONE'"));
|
||||
}
|
||||
}
|
||||
|
||||
static class TestBean {
|
||||
|
||||
@@ -32,6 +32,10 @@
|
||||
<rabbit:listener id="testListener" queues="foo" ref="testBean" method="handle"/>
|
||||
</rabbit:listener-container>
|
||||
|
||||
<rabbit:listener-container id="container6" connection-factory="connectionFactory" channel-transacted="true" transaction-size="5" >
|
||||
<rabbit:listener id="testListener" queues="foo" ref="testBean" method="handle"/>
|
||||
</rabbit:listener-container>
|
||||
|
||||
<util:list id="adviceChain">
|
||||
<bean class="org.springframework.amqp.rabbit.config.ListenerContainerParserTests$TestAdvice"/>
|
||||
<bean class="org.springframework.amqp.rabbit.config.ListenerContainerParserTests$TestAdvice"/>
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:beans="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
|
||||
xmlns:util="http://www.springframework.org/schema/util"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
|
||||
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
|
||||
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
|
||||
|
||||
<rabbit:queue name="foo" />
|
||||
|
||||
<rabbit:queue id="bar" />
|
||||
|
||||
<rabbit:listener-container id="container6" connection-factory="connectionFactory" channel-transacted="true" acknowledge="none" >
|
||||
<rabbit:listener id="testListener" queues="foo" ref="testBean" method="handle"/>
|
||||
</rabbit:listener-container>
|
||||
|
||||
<util:list id="adviceChain">
|
||||
<bean class="org.springframework.amqp.rabbit.config.ListenerContainerParserTests$TestAdvice"/>
|
||||
<bean class="org.springframework.amqp.rabbit.config.ListenerContainerParserTests$TestAdvice"/>
|
||||
<bean class="org.springframework.amqp.rabbit.config.ListenerContainerParserTests$TestAdvice"/>
|
||||
</util:list>
|
||||
|
||||
<bean class="org.springframework.amqp.rabbit.core.RabbitAdmin">
|
||||
<constructor-arg name="connectionFactory" ref="connectionFactory"/>
|
||||
</bean>
|
||||
|
||||
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"/>
|
||||
|
||||
<bean id="testBean" class="org.springframework.amqp.rabbit.config.ListenerContainerParserTests$TestBean"/>
|
||||
|
||||
</beans>
|
||||
Reference in New Issue
Block a user