diff --git a/spring-amqp-core/pom.xml b/spring-amqp-core/pom.xml index a53c3353..e34a7477 100644 --- a/spring-amqp-core/pom.xml +++ b/spring-amqp-core/pom.xml @@ -34,6 +34,7 @@ cglib cglib-nodep + runtime true diff --git a/spring-amqp-parent/pom.xml b/spring-amqp-parent/pom.xml index 15f52b7c..638e3df8 100644 --- a/spring-amqp-parent/pom.xml +++ b/spring-amqp-parent/pom.xml @@ -139,6 +139,12 @@ com.rabbitmq amqp-client ${com.rabbitmq.version} + + + commons-cli + commons-cli + + org.springframework.amqp diff --git a/spring-rabbit/pom.xml b/spring-rabbit/pom.xml index d679bb32..418d4e19 100644 --- a/spring-rabbit/pom.xml +++ b/spring-rabbit/pom.xml @@ -23,7 +23,7 @@ org.springframework.amqp spring-erlang - true + test @@ -51,6 +51,13 @@ commons-io commons-io + test + + + commons-cli + commons-cli + 1.2 + test @@ -64,6 +71,7 @@ cglib cglib-nodep + runtime true diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/QueueInfo.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/QueueInfo.java similarity index 100% rename from spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/QueueInfo.java rename to spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/QueueInfo.java diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitAdminAuthException.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitAdminAuthException.java similarity index 100% rename from spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitAdminAuthException.java rename to spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitAdminAuthException.java diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitBrokerAdmin.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitBrokerAdmin.java similarity index 100% rename from spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitBrokerAdmin.java rename to spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitBrokerAdmin.java diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitBrokerOperations.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitBrokerOperations.java similarity index 100% rename from spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitBrokerOperations.java rename to spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitBrokerOperations.java diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitControlErlangConverter.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitControlErlangConverter.java similarity index 100% rename from spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitControlErlangConverter.java rename to spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitControlErlangConverter.java diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitStatus.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitStatus.java similarity index 100% rename from spring-rabbit/src/main/java/org/springframework/amqp/rabbit/admin/RabbitStatus.java rename to spring-rabbit/src/test/java/org/springframework/amqp/rabbit/admin/RabbitStatus.java diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java index d0ee4171..e32c0a35 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java @@ -70,7 +70,12 @@ public class RabbitAdminIntegrationTests { Queue queue = new Queue("test.queue", false, true, true); rabbitAdmin.deleteQueue(queue.getName()); new RabbitAdmin(connectionFactory1).declareQueue(queue); - new RabbitAdmin(connectionFactory2).declareQueue(queue); + try { + new RabbitAdmin(connectionFactory2).declareQueue(queue); + } finally { + // Need to release the connection so the exclusive queue is deleted + connectionFactory1.destroy(); + } } @Test diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitBindingIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitBindingIntegrationTests.java index 6436ba87..0e0df111 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitBindingIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitBindingIntegrationTests.java @@ -30,7 +30,7 @@ public class RabbitBindingIntegrationTests { private RabbitTemplate template = new RabbitTemplate(connectionFactory ); @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue); @Test public void testSendAndReceiveWithTopicSingleCallback() throws Exception { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java index 403dfbed..7796eddc 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java @@ -61,7 +61,7 @@ public class RabbitTemplateIntegrationTests { } @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(ROUTE); @Test public void testSendToNonExistentAndThenReceive() throws Exception { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePerformanceIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePerformanceIntegrationTests.java index 7b09cb34..1b170908 100755 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePerformanceIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePerformanceIntegrationTests.java @@ -29,7 +29,7 @@ public class RabbitTemplatePerformanceIntegrationTests { @Rule // After the repeat processor, so it only runs once - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(ROUTE); private CachingConnectionFactory connectionFactory; diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerBrokerInterruptionIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerBrokerInterruptionIntegrationTests.java index 5888cce0..eca01ed0 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerBrokerInterruptionIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerBrokerInterruptionIntegrationTests.java @@ -63,7 +63,7 @@ public class MessageListenerBrokerInterruptionIntegrationTests { public static BrokerPanic panic = new BrokerPanic(); @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue); private ConnectionFactory connectionFactory; diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java index 22f8e989..ceb940b1 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java @@ -42,7 +42,7 @@ public class MessageListenerContainerErrorHandlerIntegrationTests { private ErrorHandler errorHandler = mock(ErrorHandler.class); @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue); @Rule public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class, diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java index 0a2b4a58..99cca6c3 100755 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java @@ -75,7 +75,7 @@ public class MessageListenerContainerLifecycleIntegrationTests { } @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue); @Rule public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class, diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerMultipleQueueIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerMultipleQueueIntegrationTests.java index 6f47f6db..013735e5 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerMultipleQueueIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerMultipleQueueIntegrationTests.java @@ -51,10 +51,10 @@ public class MessageListenerContainerMultipleQueueIntegrationTests { private static Queue queue2 = new Queue("test.queue.2"); @Rule - public BrokerRunning brokerIsRunningAndQueue1Empty = BrokerRunning.isRunningWithEmptyQueue(queue1); + public BrokerRunning brokerIsRunningAndQueue1Empty = BrokerRunning.isRunningWithEmptyQueues(queue1); @Rule - public BrokerRunning brokerIsRunningAndQueue2Empty = BrokerRunning.isRunningWithEmptyQueue(queue2); + public BrokerRunning brokerIsRunningAndQueue2Empty = BrokerRunning.isRunningWithEmptyQueues(queue2); @Rule public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class, diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java index 171ef66a..f18b9dbc 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java @@ -41,7 +41,7 @@ public class MessageListenerContainerRetryIntegrationTests { private static Queue queue = new Queue("test.queue"); @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue); @Rule public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class, diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java index 214f2232..71574c72 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java @@ -49,7 +49,7 @@ public class MessageListenerManualAckIntegrationTests { SimpleMessageListenerContainer.class, BlockingQueueConsumer.class); @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue); @Before public void createConnectionFactory() { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryCachingConnectionIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryCachingConnectionIntegrationTests.java index 45a2cd95..89691d79 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryCachingConnectionIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryCachingConnectionIntegrationTests.java @@ -1,6 +1,7 @@ package org.springframework.amqp.rabbit.listener; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -39,6 +40,8 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests { private Queue queue = new Queue("test.queue"); + private Queue sendQueue = new Queue("test.send"); + private int concurrentConsumers = 1; private int messageCount = 10; @@ -56,7 +59,7 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests { SimpleMessageListenerContainer.class, BlockingQueueConsumer.class); @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue, sendQueue); protected ConnectionFactory createConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); @@ -80,7 +83,6 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests { ConnectionFactory connectionFactory = createConnectionFactory(); RabbitTemplate template = new RabbitTemplate(connectionFactory); - Queue sendQueue = new Queue("test.send"); new RabbitAdmin(connectionFactory).declareQueue(sendQueue); acknowledgeMode = AcknowledgeMode.AUTO; @@ -97,7 +99,9 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests { assertTrue("Timed out waiting for message", waited); // All messages committed - assertEquals("bar", new String((byte[]) template.receiveAndConvert(sendQueue.getName()))); + byte[] bytes = (byte[]) template.receiveAndConvert(sendQueue.getName()); + assertNotNull(bytes); + assertEquals("bar", new String(bytes)); assertNull(template.receiveAndConvert(queue.getName())); } @@ -107,7 +111,6 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests { ConnectionFactory connectionFactory = createConnectionFactory(); RabbitTemplate template = new RabbitTemplate(connectionFactory); - Queue sendQueue = new Queue("test.send"); new RabbitAdmin(connectionFactory).declareQueue(sendQueue); acknowledgeMode = AcknowledgeMode.AUTO; @@ -124,6 +127,7 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests { assertTrue("Timed out waiting for message", waited); container.stop(); + Thread.sleep(200L); // Foo message is redelivered assertEquals("foo", template.receiveAndConvert(queue.getName())); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java index 12eedab1..0d132a38 100755 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java @@ -54,7 +54,7 @@ public class SimpleMessageListenerContainerIntegrationTests { // SimpleMessageListenerContainer.class, BlockingQueueConsumer.class); @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue); private final int messageCount; diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/test/BrokerRunning.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/test/BrokerRunning.java index 150cacb4..61980675 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/test/BrokerRunning.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/test/BrokerRunning.java @@ -55,7 +55,7 @@ public class BrokerRunning extends TestWatchman { private final boolean purge; - private Queue queue; + private Queue[] queues; private int port = BrokerTestUtils.getPort(); @@ -66,8 +66,12 @@ public class BrokerRunning extends TestWatchman { * * @return a new rule that assumes an existing running broker */ - public static BrokerRunning isRunningWithEmptyQueue(String queue) { - return new BrokerRunning(true, new Queue(queue), true); + public static BrokerRunning isRunningWithEmptyQueues(String... names) { + Queue[] queues = new Queue[names.length]; + for (int i = 0; i < queues.length; i++) { + queues[i] = new Queue(names[i]); + } + return new BrokerRunning(true, true, queues); } /** @@ -75,8 +79,8 @@ public class BrokerRunning extends TestWatchman { * * @return a new rule that assumes an existing running broker */ - public static BrokerRunning isRunningWithEmptyQueue(Queue queue) { - return new BrokerRunning(true, queue, true); + public static BrokerRunning isRunningWithEmptyQueues(Queue... queues) { + return new BrokerRunning(true, true, queues); } /** @@ -93,14 +97,14 @@ public class BrokerRunning extends TestWatchman { return new BrokerRunning(false); } - private BrokerRunning(boolean assumeOnline, Queue queue, boolean purge) { + private BrokerRunning(boolean assumeOnline, boolean purge, Queue... queues) { this.assumeOnline = assumeOnline; - this.queue = queue; + this.queues = queues; this.purge = purge; } - private BrokerRunning(boolean assumeOnline, Queue queue) { - this(assumeOnline, queue, false); + private BrokerRunning(boolean assumeOnline, Queue... queues) { + this(assumeOnline, false, queues); } private BrokerRunning(boolean assumeOnline) { @@ -140,13 +144,14 @@ public class BrokerRunning extends TestWatchman { } RabbitAdmin admin = new RabbitAdmin(connectionFactory); - String queueName = queue.getName(); + for (Queue queue : queues) { + String queueName = queue.getName(); - if (purge) { - logger.debug("Deleting queue: " + queueName); - // Delete completely - gets rid of consumers and bindings as well - admin.deleteQueue(queueName); - } + if (purge) { + logger.debug("Deleting queue: " + queueName); + // Delete completely - gets rid of consumers and bindings as well + admin.deleteQueue(queueName); + } if (isDefaultQueue(queueName)) { // Just for test probe. @@ -154,6 +159,7 @@ public class BrokerRunning extends TestWatchman { } else { admin.declareQueue(queue); } + } brokerOffline = false; if (!assumeOnline) { Assume.assumeTrue(brokerOffline); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/transaction/RabbitTransactionManagerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/transaction/RabbitTransactionManagerIntegrationTests.java index 9a41f08f..75e1f17c 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/transaction/RabbitTransactionManagerIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/transaction/RabbitTransactionManagerIntegrationTests.java @@ -22,7 +22,7 @@ public class RabbitTransactionManagerIntegrationTests { private TransactionTemplate transactionTemplate; @Rule - public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(ROUTE); @Before public void init() { diff --git a/src/docbkx/amqp.xml b/src/docbkx/amqp.xml index dcf8c4a7..105613ba 100644 --- a/src/docbkx/amqp.xml +++ b/src/docbkx/amqp.xml @@ -652,12 +652,7 @@ Object receiveAndConvert(String queueName) throws AmqpException;]]> + org.springframework.amqp.rabbit.core package. The AmqpAdmin interface is based on using the Spring AMQP domain abstractions and is shown below: @@ -1084,12 +1079,4 @@ public class ExampleExternalTransactionAmqpConfiguration { -
- JMX Monitoring - - All the methods in the RabbitBrokerAdmin class are exposed through - JMX through the use of the @ManagedOperations annotation. Please refer to - the sample application for how to configure spring to export the - RabbitBrokerAdmin. -