AMQP-123: Support for String[] queueNames in MLC

This commit is contained in:
Dave Syer
2011-03-22 15:13:41 +00:00
parent a7c2df55f3
commit d236fc71a0
13 changed files with 65 additions and 76 deletions

View File

@@ -21,10 +21,20 @@ import org.springframework.util.StringUtils;
/**
* Represents an address for publication of an AMQP message. The AMQP 0-8 and 0-9 specifications have an unstructured
* string that is used as a "reply to" address. There are however conventions in use and this class makes it easier to
* follow these conventions.
* follow these conventions, which can be easily summarised as:
*
* <pre>
* (exchangeType)://(exchange)/(routingKey)
* </pre>
*
* Here we also allow the exchange type to default to direct, and the exchange name to default to empty (so just a
* routing key will work if you know the queue name).
*
* @see ExchangeTypes
*
* @author Mark Pollack
* @author Mark Fisher
* @author Dave Syer
*/
public class Address {

View File

@@ -165,11 +165,6 @@ public class MessageProperties {
this.replyTo = replyTo;
}
//TODO - create Address/ReplyTo class to encapsulate exchangeType/exchange/routingkey ?
// qpid 0.8/1.0 .NET don't use a single string, but a pair.
// qpid 0.8 Java uses a single string
//
// See RabbitMQ .NET developer guide for more details on conventions for this string
public Address getReplyTo() {
return this.replyTo;
}
@@ -198,9 +193,6 @@ public class MessageProperties {
return this.contentLength;
}
//public void setDefaultCharset(String charSet) {
//}
public void setDeliveryMode(MessageDeliveryMode deliveryMode) {
this.deliveryMode = deliveryMode;
}

View File

@@ -73,10 +73,9 @@ class ListenerContainerParser implements BeanDefinitionParser {
private static final String PHASE_ATTRIBUTE = "phase";
public BeanDefinition parse(Element element, ParserContext parserContext) {
CompositeComponentDefinition compositeDef =
new CompositeComponentDefinition(element.getTagName(), parserContext.extractSource(element));
CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(),
parserContext.extractSource(element));
parserContext.pushContainingComponent(compositeDef);
NodeList childNodes = element.getChildNodes();
@@ -89,7 +88,7 @@ class ListenerContainerParser implements BeanDefinitionParser {
}
}
}
parserContext.popAndRegisterContainingComponent();
return null;
}
@@ -97,13 +96,11 @@ class ListenerContainerParser implements BeanDefinitionParser {
private void parseListener(Element listenerEle, Element containerEle, ParserContext parserContext) {
RootBeanDefinition listenerDef = new RootBeanDefinition();
listenerDef.setSource(parserContext.extractSource(listenerEle));
String ref = listenerEle.getAttribute(REF_ATTRIBUTE);
if (!StringUtils.hasText(ref)) {
parserContext.getReaderContext().error(
"Listener 'ref' attribute contains empty value.", listenerEle);
}
else {
parserContext.getReaderContext().error("Listener 'ref' attribute contains empty value.", listenerEle);
} else {
listenerDef.getPropertyValues().add("delegate", new RuntimeBeanReference(ref));
}
@@ -111,8 +108,8 @@ class ListenerContainerParser implements BeanDefinitionParser {
if (listenerEle.hasAttribute(METHOD_ATTRIBUTE)) {
method = listenerEle.getAttribute(METHOD_ATTRIBUTE);
if (!StringUtils.hasText(method)) {
parserContext.getReaderContext().error(
"Listener 'method' attribute contains empty value.", listenerEle);
parserContext.getReaderContext()
.error("Listener 'method' attribute contains empty value.", listenerEle);
}
}
listenerDef.getPropertyValues().add("defaultListenerMethod", method);
@@ -122,10 +119,8 @@ class ListenerContainerParser implements BeanDefinitionParser {
if (!StringUtils.hasText(messageConverter)) {
parserContext.getReaderContext().error(
"Listener container 'message-converter' attribute contains empty value.", containerEle);
}
else {
listenerDef.getPropertyValues().add("messageConverter",
new RuntimeBeanReference(messageConverter));
} else {
listenerDef.getPropertyValues().add("messageConverter", new RuntimeBeanReference(messageConverter));
}
}
@@ -145,17 +140,18 @@ class ListenerContainerParser implements BeanDefinitionParser {
containerDef.getPropertyValues().add("messageListener", listenerDef);
String containerBeanName = listenerEle.getAttribute(ID_ATTRIBUTE);
// If no bean id is given auto generate one using the ReaderContext's BeanNameGenerator
// If no bean id is given auto generate one using the ReaderContext's BeanNameGenerator
if (!StringUtils.hasText(containerBeanName)) {
containerBeanName = parserContext.getReaderContext().generateBeanName(containerDef);
}
String queueNames = listenerEle.getAttribute(QUEUE_NAMES_ATTRIBUTE);
if (!StringUtils.hasText(queueNames)) {
parserContext.getReaderContext().error(
"Listener 'queue-names' attribute contains empty value.", listenerEle);
parserContext.getReaderContext().error("Listener 'queue-names' attribute contains empty value.",
listenerEle);
}
containerDef.getPropertyValues().add("queueName", queueNames);
containerDef.getPropertyValues().add("queueNames",
StringUtils.trimArrayElements(StringUtils.commaDelimitedListToStringArray(queueNames)));
// Register the listener and fire event
parserContext.registerBeanComponent(new BeanComponentDefinition(containerDef, containerBeanName));
@@ -180,14 +176,12 @@ class ListenerContainerParser implements BeanDefinitionParser {
String taskExecutorBeanName = containerEle.getAttribute(TASK_EXECUTOR_ATTRIBUTE);
if (StringUtils.hasText(taskExecutorBeanName)) {
containerDef.getPropertyValues().add("taskExecutor",
new RuntimeBeanReference(taskExecutorBeanName));
containerDef.getPropertyValues().add("taskExecutor", new RuntimeBeanReference(taskExecutorBeanName));
}
String errorHandlerBeanName = containerEle.getAttribute(ERROR_HANDLER_ATTRIBUTE);
if (StringUtils.hasText(errorHandlerBeanName)) {
containerDef.getPropertyValues().add("errorHandler",
new RuntimeBeanReference(errorHandlerBeanName));
containerDef.getPropertyValues().add("errorHandler", new RuntimeBeanReference(errorHandlerBeanName));
}
AcknowledgeMode acknowledgeMode = parseAcknowledgeMode(containerEle, parserContext);
@@ -230,20 +224,17 @@ class ListenerContainerParser implements BeanDefinitionParser {
if (StringUtils.hasText(acknowledge)) {
if (ACKNOWLEDGE_AUTO.equals(acknowledge)) {
acknowledgeMode = AcknowledgeMode.AUTO;
}
else if (ACKNOWLEDGE_MANUAL.equals(acknowledge)) {
} else if (ACKNOWLEDGE_MANUAL.equals(acknowledge)) {
acknowledgeMode = AcknowledgeMode.MANUAL;
}
else if (ACKNOWLEDGE_NONE.equals(acknowledge)) {
} else if (ACKNOWLEDGE_NONE.equals(acknowledge)) {
acknowledgeMode = AcknowledgeMode.NONE;
}
else {
parserContext.getReaderContext().error("Invalid listener container 'acknowledge' setting [" +
acknowledge + "]: only \"auto\", \"manual\", and \"none\" supported.", ele);
} else {
parserContext.getReaderContext().error(
"Invalid listener container 'acknowledge' setting [" + acknowledge
+ "]: only \"auto\", \"manual\", and \"none\" supported.", ele);
}
return acknowledgeMode;
}
else {
} else {
return null;
}
}

View File

@@ -56,7 +56,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
private final Object lifecycleMonitor = new Object();
private volatile String queueName;
private volatile String[] queueNames;
private ErrorHandler errorHandler;
@@ -104,35 +104,31 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
* Set the name of the queue to receive messages from.
* @param queueName the desired queue (can not be <code>null</code>)
*/
public void setQueueName(String queueName) {
// TODO change to QueueNames(String... queueNames)
Assert.notNull(queueName, "'queueName' must not be null");
this.queueName = queueName;
public void setQueueNames(String... queueName) {
this.queueNames = queueName;
}
public void setQueues(Queue... queues) {
// TODO check for null arg value, refactor out of string based
// conventions. Merge with string queue name values?
StringBuilder sb = new StringBuilder();
int size = queues.length;
for (int i = 0; i < size; i++) {
sb.append(queues[i].getName());
if (i != size - 1)
sb.append(",");
// TODO: Merge with string queue name values?
String[] queueNames = new String[queues.length];
for (int i = 0; i < queues.length; i++) {
Assert.notNull(queues[i], "Queue must not be null.");
queueNames[i] = queues[i].getName();
}
this.queueName = sb.toString();
this.queueNames = queueNames;
}
/**
* Return the name of the queue to receive messages from.
*/
public String getQueueName() {
return this.queueName;
public String[] getQueueNames() {
return this.queueNames;
}
protected String getRequiredQueueName() {
Assert.notNull(this.queueName, "Queue name must not be null.");
return this.queueName;
protected String[] getRequiredQueueNames() {
Assert.notNull(this.queueNames, "Queue names must not be null.");
Assert.state(this.queueNames.length > 0, "Queue names must not be empty.");
return this.queueNames;
}
/**
@@ -348,7 +344,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
return this.active;
}
}
/**
* Start this container.
* @see #doStart
@@ -512,7 +508,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
* @param listener the Spring ChannelAwareMessageListener to invoke
* @param channel the Rabbit Channel to operate on
* @param message the received Rabbit Message
* @throws Exception if thrown by Rabbit API methods or listener itself.
* @throws Exception if thrown by Rabbit API methods or listener itself.
* <p/>
* Exception thrown from listener will be wrapped to {@link ListenerExecutionFailedException}.
* @see ChannelAwareMessageListener
@@ -580,7 +576,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
} else if (ackRequired) {
if (ackRequired) {
channel.basicAck(deliveryTag, false);
}
}
}
}

View File

@@ -39,7 +39,6 @@ import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttribu
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.rabbitmq.client.Channel;
@@ -361,8 +360,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
protected BlockingQueueConsumer createBlockingQueueConsumer() {
BlockingQueueConsumer consumer;
String queueNames = getRequiredQueueName();
String[] queues = StringUtils.commaDelimitedListToStringArray(queueNames);
String[] queues = getRequiredQueueNames();
consumer = new BlockingQueueConsumer(getConnectionFactory(), getAcknowledgeMode(), isChannelTransacted(),
prefetchCount, queues);
return consumer;

View File

@@ -18,6 +18,8 @@ package org.springframework.amqp.rabbit.config;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
@@ -50,7 +52,7 @@ public final class ListenerContainerParserTests {
DirectFieldAccessor listenerAccessor = new DirectFieldAccessor(container.getMessageListener());
assertEquals(beanFactory.getBean(TestBean.class), listenerAccessor.getPropertyValue("delegate"));
assertEquals("handle", listenerAccessor.getPropertyValue("defaultListenerMethod"));
assertEquals("foo, bar", container.getQueueName());
assertEquals("[foo, bar]", Arrays.asList(container.getQueueNames()).toString());
}

View File

@@ -151,7 +151,7 @@ public class MessageListenerBrokerInterruptionIntegrationTests {
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(new MessageListenerAdapter(listener));
container.setQueueName(queueName);
container.setQueueNames(queueName);
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);

View File

@@ -151,7 +151,7 @@ public class MessageListenerContainerErrorHandlerIntegrationTests {
container.setPrefetchCount(messageCount);
container.setTxSize(messageCount);
container.setQueueName(queue.getName());
container.setQueueNames(queue.getName());
container.setErrorHandler(errorHandler);
container.afterPropertiesSet();
container.start();

View File

@@ -146,7 +146,7 @@ public class MessageListenerContainerLifecycleIntegrationTests {
container.setPrefetchCount(transactionMode.getPrefetch());
container.setTxSize(transactionMode.getTxSize());
}
container.setQueueName(queue.getName());
container.setQueueNames(queue.getName());
container.afterPropertiesSet();
container.start();

View File

@@ -101,7 +101,7 @@ public class MessageListenerManualAckIntegrationTests {
private SimpleMessageListenerContainer createContainer(Object listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory());
container.setMessageListener(new MessageListenerAdapter(listener));
container.setQueueName(queue.getName());
container.setQueueNames(queue.getName());
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);

View File

@@ -199,7 +199,7 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
private SimpleMessageListenerContainer createContainer(String queueName, Object listener, ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(new MessageListenerAdapter(listener));
container.setQueueName(queueName);
container.setQueueNames(queueName);
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);

View File

@@ -214,7 +214,7 @@ public class SimpleMessageListenerContainerIntegrationTests {
private SimpleMessageListenerContainer createContainer(Object listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory());
container.setMessageListener(listener);
container.setQueueName(queue.getName());
container.setQueueNames(queue.getName());
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);

View File

@@ -36,7 +36,7 @@ public class SimpleMessageListenerContainerTests {
public void testInconsistentTransactionConfiguration() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(new SingleConnectionFactory());
container.setMessageListener(new MessageListenerAdapter(this));
container.setQueueName("foo");
container.setQueueNames("foo");
container.setChannelTransacted(false);
container.setAcknowledgeMode(AcknowledgeMode.NONE);
container.setTransactionManager(new TestTransactionManager());
@@ -48,7 +48,7 @@ public class SimpleMessageListenerContainerTests {
public void testInconsistentAcknowledgeConfiguration() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(new SingleConnectionFactory());
container.setMessageListener(new MessageListenerAdapter(this));
container.setQueueName("foo");
container.setQueueNames("foo");
container.setChannelTransacted(true);
container.setAcknowledgeMode(AcknowledgeMode.NONE);
expectedException.expect(IllegalStateException.class);