diff --git a/spring-amqp-core/src/main/java/org/springframework/amqp/core/Address.java b/spring-amqp-core/src/main/java/org/springframework/amqp/core/Address.java index f01f0df6..37288c9c 100644 --- a/spring-amqp-core/src/main/java/org/springframework/amqp/core/Address.java +++ b/spring-amqp-core/src/main/java/org/springframework/amqp/core/Address.java @@ -21,10 +21,20 @@ import org.springframework.util.StringUtils; /** * Represents an address for publication of an AMQP message. The AMQP 0-8 and 0-9 specifications have an unstructured * string that is used as a "reply to" address. There are however conventions in use and this class makes it easier to - * follow these conventions. + * follow these conventions, which can be easily summarised as: + * + *
+ * (exchangeType)://(exchange)/(routingKey) + *+ * + * Here we also allow the exchange type to default to direct, and the exchange name to default to empty (so just a + * routing key will work if you know the queue name). + * + * @see ExchangeTypes * * @author Mark Pollack * @author Mark Fisher + * @author Dave Syer */ public class Address { diff --git a/spring-amqp-core/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp-core/src/main/java/org/springframework/amqp/core/MessageProperties.java index 7ec442d3..9f5df6dd 100644 --- a/spring-amqp-core/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp-core/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -165,11 +165,6 @@ public class MessageProperties { this.replyTo = replyTo; } - //TODO - create Address/ReplyTo class to encapsulate exchangeType/exchange/routingkey ? - // qpid 0.8/1.0 .NET don't use a single string, but a pair. - // qpid 0.8 Java uses a single string - // - // See RabbitMQ .NET developer guide for more details on conventions for this string public Address getReplyTo() { return this.replyTo; } @@ -198,9 +193,6 @@ public class MessageProperties { return this.contentLength; } - //public void setDefaultCharset(String charSet) { - //} - public void setDeliveryMode(MessageDeliveryMode deliveryMode) { this.deliveryMode = deliveryMode; } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java index bc3530f2..e1ca45f3 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java @@ -73,10 +73,9 @@ class ListenerContainerParser implements BeanDefinitionParser { private static final String PHASE_ATTRIBUTE = "phase"; - public BeanDefinition parse(Element element, ParserContext parserContext) { - CompositeComponentDefinition compositeDef = - new CompositeComponentDefinition(element.getTagName(), parserContext.extractSource(element)); + CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), + parserContext.extractSource(element)); parserContext.pushContainingComponent(compositeDef); NodeList childNodes = element.getChildNodes(); @@ -89,7 +88,7 @@ class ListenerContainerParser implements BeanDefinitionParser { } } } - + parserContext.popAndRegisterContainingComponent(); return null; } @@ -97,13 +96,11 @@ class ListenerContainerParser implements BeanDefinitionParser { private void parseListener(Element listenerEle, Element containerEle, ParserContext parserContext) { RootBeanDefinition listenerDef = new RootBeanDefinition(); listenerDef.setSource(parserContext.extractSource(listenerEle)); - + String ref = listenerEle.getAttribute(REF_ATTRIBUTE); if (!StringUtils.hasText(ref)) { - parserContext.getReaderContext().error( - "Listener 'ref' attribute contains empty value.", listenerEle); - } - else { + parserContext.getReaderContext().error("Listener 'ref' attribute contains empty value.", listenerEle); + } else { listenerDef.getPropertyValues().add("delegate", new RuntimeBeanReference(ref)); } @@ -111,8 +108,8 @@ class ListenerContainerParser implements BeanDefinitionParser { if (listenerEle.hasAttribute(METHOD_ATTRIBUTE)) { method = listenerEle.getAttribute(METHOD_ATTRIBUTE); if (!StringUtils.hasText(method)) { - parserContext.getReaderContext().error( - "Listener 'method' attribute contains empty value.", listenerEle); + parserContext.getReaderContext() + .error("Listener 'method' attribute contains empty value.", listenerEle); } } listenerDef.getPropertyValues().add("defaultListenerMethod", method); @@ -122,10 +119,8 @@ class ListenerContainerParser implements BeanDefinitionParser { if (!StringUtils.hasText(messageConverter)) { parserContext.getReaderContext().error( "Listener container 'message-converter' attribute contains empty value.", containerEle); - } - else { - listenerDef.getPropertyValues().add("messageConverter", - new RuntimeBeanReference(messageConverter)); + } else { + listenerDef.getPropertyValues().add("messageConverter", new RuntimeBeanReference(messageConverter)); } } @@ -145,17 +140,18 @@ class ListenerContainerParser implements BeanDefinitionParser { containerDef.getPropertyValues().add("messageListener", listenerDef); String containerBeanName = listenerEle.getAttribute(ID_ATTRIBUTE); - // If no bean id is given auto generate one using the ReaderContext's BeanNameGenerator + // If no bean id is given auto generate one using the ReaderContext's BeanNameGenerator if (!StringUtils.hasText(containerBeanName)) { containerBeanName = parserContext.getReaderContext().generateBeanName(containerDef); } String queueNames = listenerEle.getAttribute(QUEUE_NAMES_ATTRIBUTE); if (!StringUtils.hasText(queueNames)) { - parserContext.getReaderContext().error( - "Listener 'queue-names' attribute contains empty value.", listenerEle); + parserContext.getReaderContext().error("Listener 'queue-names' attribute contains empty value.", + listenerEle); } - containerDef.getPropertyValues().add("queueName", queueNames); + containerDef.getPropertyValues().add("queueNames", + StringUtils.trimArrayElements(StringUtils.commaDelimitedListToStringArray(queueNames))); // Register the listener and fire event parserContext.registerBeanComponent(new BeanComponentDefinition(containerDef, containerBeanName)); @@ -180,14 +176,12 @@ class ListenerContainerParser implements BeanDefinitionParser { String taskExecutorBeanName = containerEle.getAttribute(TASK_EXECUTOR_ATTRIBUTE); if (StringUtils.hasText(taskExecutorBeanName)) { - containerDef.getPropertyValues().add("taskExecutor", - new RuntimeBeanReference(taskExecutorBeanName)); + containerDef.getPropertyValues().add("taskExecutor", new RuntimeBeanReference(taskExecutorBeanName)); } String errorHandlerBeanName = containerEle.getAttribute(ERROR_HANDLER_ATTRIBUTE); if (StringUtils.hasText(errorHandlerBeanName)) { - containerDef.getPropertyValues().add("errorHandler", - new RuntimeBeanReference(errorHandlerBeanName)); + containerDef.getPropertyValues().add("errorHandler", new RuntimeBeanReference(errorHandlerBeanName)); } AcknowledgeMode acknowledgeMode = parseAcknowledgeMode(containerEle, parserContext); @@ -230,20 +224,17 @@ class ListenerContainerParser implements BeanDefinitionParser { if (StringUtils.hasText(acknowledge)) { if (ACKNOWLEDGE_AUTO.equals(acknowledge)) { acknowledgeMode = AcknowledgeMode.AUTO; - } - else if (ACKNOWLEDGE_MANUAL.equals(acknowledge)) { + } else if (ACKNOWLEDGE_MANUAL.equals(acknowledge)) { acknowledgeMode = AcknowledgeMode.MANUAL; - } - else if (ACKNOWLEDGE_NONE.equals(acknowledge)) { + } else if (ACKNOWLEDGE_NONE.equals(acknowledge)) { acknowledgeMode = AcknowledgeMode.NONE; - } - else { - parserContext.getReaderContext().error("Invalid listener container 'acknowledge' setting [" + - acknowledge + "]: only \"auto\", \"manual\", and \"none\" supported.", ele); + } else { + parserContext.getReaderContext().error( + "Invalid listener container 'acknowledge' setting [" + acknowledge + + "]: only \"auto\", \"manual\", and \"none\" supported.", ele); } return acknowledgeMode; - } - else { + } else { return null; } } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java index 5f0d7cda..e3064330 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java @@ -56,7 +56,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im private final Object lifecycleMonitor = new Object(); - private volatile String queueName; + private volatile String[] queueNames; private ErrorHandler errorHandler; @@ -104,35 +104,31 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im * Set the name of the queue to receive messages from. * @param queueName the desired queue (can not be
null)
*/
- public void setQueueName(String queueName) {
- // TODO change to QueueNames(String... queueNames)
- Assert.notNull(queueName, "'queueName' must not be null");
- this.queueName = queueName;
+ public void setQueueNames(String... queueName) {
+ this.queueNames = queueName;
}
public void setQueues(Queue... queues) {
- // TODO check for null arg value, refactor out of string based
- // conventions. Merge with string queue name values?
- StringBuilder sb = new StringBuilder();
- int size = queues.length;
- for (int i = 0; i < size; i++) {
- sb.append(queues[i].getName());
- if (i != size - 1)
- sb.append(",");
+ // TODO: Merge with string queue name values?
+ String[] queueNames = new String[queues.length];
+ for (int i = 0; i < queues.length; i++) {
+ Assert.notNull(queues[i], "Queue must not be null.");
+ queueNames[i] = queues[i].getName();
}
- this.queueName = sb.toString();
+ this.queueNames = queueNames;
}
/**
* Return the name of the queue to receive messages from.
*/
- public String getQueueName() {
- return this.queueName;
+ public String[] getQueueNames() {
+ return this.queueNames;
}
- protected String getRequiredQueueName() {
- Assert.notNull(this.queueName, "Queue name must not be null.");
- return this.queueName;
+ protected String[] getRequiredQueueNames() {
+ Assert.notNull(this.queueNames, "Queue names must not be null.");
+ Assert.state(this.queueNames.length > 0, "Queue names must not be empty.");
+ return this.queueNames;
}
/**
@@ -348,7 +344,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
return this.active;
}
}
-
+
/**
* Start this container.
* @see #doStart
@@ -512,7 +508,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
* @param listener the Spring ChannelAwareMessageListener to invoke
* @param channel the Rabbit Channel to operate on
* @param message the received Rabbit Message
- * @throws Exception if thrown by Rabbit API methods or listener itself.
+ * @throws Exception if thrown by Rabbit API methods or listener itself.
*
* Exception thrown from listener will be wrapped to {@link ListenerExecutionFailedException}.
* @see ChannelAwareMessageListener
@@ -580,7 +576,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
} else if (ackRequired) {
if (ackRequired) {
channel.basicAck(deliveryTag, false);
- }
+ }
}
}
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java
index 079aad77..d694dffc 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java
@@ -39,7 +39,6 @@ import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttribu
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.util.Assert;
-import org.springframework.util.StringUtils;
import com.rabbitmq.client.Channel;
@@ -361,8 +360,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
protected BlockingQueueConsumer createBlockingQueueConsumer() {
BlockingQueueConsumer consumer;
- String queueNames = getRequiredQueueName();
- String[] queues = StringUtils.commaDelimitedListToStringArray(queueNames);
+ String[] queues = getRequiredQueueNames();
consumer = new BlockingQueueConsumer(getConnectionFactory(), getAcknowledgeMode(), isChannelTransacted(),
prefetchCount, queues);
return consumer;
diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java
index 0be007f1..b417015c 100644
--- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java
+++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java
@@ -18,6 +18,8 @@ package org.springframework.amqp.rabbit.config;
import static org.junit.Assert.assertEquals;
+import java.util.Arrays;
+
import org.junit.Before;
import org.junit.Test;
@@ -50,7 +52,7 @@ public final class ListenerContainerParserTests {
DirectFieldAccessor listenerAccessor = new DirectFieldAccessor(container.getMessageListener());
assertEquals(beanFactory.getBean(TestBean.class), listenerAccessor.getPropertyValue("delegate"));
assertEquals("handle", listenerAccessor.getPropertyValue("defaultListenerMethod"));
- assertEquals("foo, bar", container.getQueueName());
+ assertEquals("[foo, bar]", Arrays.asList(container.getQueueNames()).toString());
}
diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerBrokerInterruptionIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerBrokerInterruptionIntegrationTests.java
index 44a1b857..6226303f 100644
--- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerBrokerInterruptionIntegrationTests.java
+++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerBrokerInterruptionIntegrationTests.java
@@ -151,7 +151,7 @@ public class MessageListenerBrokerInterruptionIntegrationTests {
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(new MessageListenerAdapter(listener));
- container.setQueueName(queueName);
+ container.setQueueNames(queueName);
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);
diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java
index cb522877..94763b4a 100644
--- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java
+++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java
@@ -151,7 +151,7 @@ public class MessageListenerContainerErrorHandlerIntegrationTests {
container.setPrefetchCount(messageCount);
container.setTxSize(messageCount);
- container.setQueueName(queue.getName());
+ container.setQueueNames(queue.getName());
container.setErrorHandler(errorHandler);
container.afterPropertiesSet();
container.start();
diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java
index 4bb244d7..d946f874 100755
--- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java
+++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java
@@ -146,7 +146,7 @@ public class MessageListenerContainerLifecycleIntegrationTests {
container.setPrefetchCount(transactionMode.getPrefetch());
container.setTxSize(transactionMode.getTxSize());
}
- container.setQueueName(queue.getName());
+ container.setQueueNames(queue.getName());
container.afterPropertiesSet();
container.start();
diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java
index d260764d..214f2232 100644
--- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java
+++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java
@@ -101,7 +101,7 @@ public class MessageListenerManualAckIntegrationTests {
private SimpleMessageListenerContainer createContainer(Object listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory());
container.setMessageListener(new MessageListenerAdapter(listener));
- container.setQueueName(queue.getName());
+ container.setQueueNames(queue.getName());
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);
diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryCachingConnectionIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryCachingConnectionIntegrationTests.java
index ae0362e4..0e833a52 100644
--- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryCachingConnectionIntegrationTests.java
+++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryCachingConnectionIntegrationTests.java
@@ -199,7 +199,7 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
private SimpleMessageListenerContainer createContainer(String queueName, Object listener, ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(new MessageListenerAdapter(listener));
- container.setQueueName(queueName);
+ container.setQueueNames(queueName);
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);
diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java
index 5024b04e..a9999ea6 100755
--- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java
+++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java
@@ -214,7 +214,7 @@ public class SimpleMessageListenerContainerIntegrationTests {
private SimpleMessageListenerContainer createContainer(Object listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory());
container.setMessageListener(listener);
- container.setQueueName(queue.getName());
+ container.setQueueNames(queue.getName());
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
container.setConcurrentConsumers(concurrentConsumers);
diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java
index dd84f440..149e8170 100644
--- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java
+++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java
@@ -36,7 +36,7 @@ public class SimpleMessageListenerContainerTests {
public void testInconsistentTransactionConfiguration() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(new SingleConnectionFactory());
container.setMessageListener(new MessageListenerAdapter(this));
- container.setQueueName("foo");
+ container.setQueueNames("foo");
container.setChannelTransacted(false);
container.setAcknowledgeMode(AcknowledgeMode.NONE);
container.setTransactionManager(new TestTransactionManager());
@@ -48,7 +48,7 @@ public class SimpleMessageListenerContainerTests {
public void testInconsistentAcknowledgeConfiguration() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(new SingleConnectionFactory());
container.setMessageListener(new MessageListenerAdapter(this));
- container.setQueueName("foo");
+ container.setQueueNames("foo");
container.setChannelTransacted(true);
container.setAcknowledgeMode(AcknowledgeMode.NONE);
expectedException.expect(IllegalStateException.class);