AMQP-140: re-organise, move BrokerAdmin and fix integration tests

This commit is contained in:
Dave Syer
2011-03-31 15:38:56 +01:00
parent 225ae803b2
commit 652bebc585
24 changed files with 64 additions and 47 deletions

View File

@@ -34,6 +34,7 @@
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>

View File

@@ -139,6 +139,12 @@
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${com.rabbitmq.version}</version>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>

View File

@@ -23,7 +23,7 @@
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-erlang</artifactId>
<optional>true</optional>
<scope>test</scope>
</dependency>
<!-- Spring -->
@@ -51,6 +51,13 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
<scope>test</scope>
</dependency>
<!-- Logging -->
@@ -64,6 +71,7 @@
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>

View File

@@ -70,7 +70,12 @@ public class RabbitAdminIntegrationTests {
Queue queue = new Queue("test.queue", false, true, true);
rabbitAdmin.deleteQueue(queue.getName());
new RabbitAdmin(connectionFactory1).declareQueue(queue);
new RabbitAdmin(connectionFactory2).declareQueue(queue);
try {
new RabbitAdmin(connectionFactory2).declareQueue(queue);
} finally {
// Need to release the connection so the exclusive queue is deleted
connectionFactory1.destroy();
}
}
@Test

View File

@@ -30,7 +30,7 @@ public class RabbitBindingIntegrationTests {
private RabbitTemplate template = new RabbitTemplate(connectionFactory );
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue);
@Test
public void testSendAndReceiveWithTopicSingleCallback() throws Exception {

View File

@@ -61,7 +61,7 @@ public class RabbitTemplateIntegrationTests {
}
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(ROUTE);
@Test
public void testSendToNonExistentAndThenReceive() throws Exception {

View File

@@ -29,7 +29,7 @@ public class RabbitTemplatePerformanceIntegrationTests {
@Rule
// After the repeat processor, so it only runs once
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(ROUTE);
private CachingConnectionFactory connectionFactory;

View File

@@ -63,7 +63,7 @@ public class MessageListenerBrokerInterruptionIntegrationTests {
public static BrokerPanic panic = new BrokerPanic();
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue);
private ConnectionFactory connectionFactory;

View File

@@ -42,7 +42,7 @@ public class MessageListenerContainerErrorHandlerIntegrationTests {
private ErrorHandler errorHandler = mock(ErrorHandler.class);
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue);
@Rule
public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class,

View File

@@ -75,7 +75,7 @@ public class MessageListenerContainerLifecycleIntegrationTests {
}
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue);
@Rule
public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class,

View File

@@ -51,10 +51,10 @@ public class MessageListenerContainerMultipleQueueIntegrationTests {
private static Queue queue2 = new Queue("test.queue.2");
@Rule
public BrokerRunning brokerIsRunningAndQueue1Empty = BrokerRunning.isRunningWithEmptyQueue(queue1);
public BrokerRunning brokerIsRunningAndQueue1Empty = BrokerRunning.isRunningWithEmptyQueues(queue1);
@Rule
public BrokerRunning brokerIsRunningAndQueue2Empty = BrokerRunning.isRunningWithEmptyQueue(queue2);
public BrokerRunning brokerIsRunningAndQueue2Empty = BrokerRunning.isRunningWithEmptyQueues(queue2);
@Rule
public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class,

View File

@@ -41,7 +41,7 @@ public class MessageListenerContainerRetryIntegrationTests {
private static Queue queue = new Queue("test.queue");
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue);
@Rule
public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class,

View File

@@ -49,7 +49,7 @@ public class MessageListenerManualAckIntegrationTests {
SimpleMessageListenerContainer.class, BlockingQueueConsumer.class);
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue);
@Before
public void createConnectionFactory() {

View File

@@ -1,6 +1,7 @@
package org.springframework.amqp.rabbit.listener;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -39,6 +40,8 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
private Queue queue = new Queue("test.queue");
private Queue sendQueue = new Queue("test.send");
private int concurrentConsumers = 1;
private int messageCount = 10;
@@ -56,7 +59,7 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
SimpleMessageListenerContainer.class, BlockingQueueConsumer.class);
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue, sendQueue);
protected ConnectionFactory createConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
@@ -80,7 +83,6 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
ConnectionFactory connectionFactory = createConnectionFactory();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
Queue sendQueue = new Queue("test.send");
new RabbitAdmin(connectionFactory).declareQueue(sendQueue);
acknowledgeMode = AcknowledgeMode.AUTO;
@@ -97,7 +99,9 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
assertTrue("Timed out waiting for message", waited);
// All messages committed
assertEquals("bar", new String((byte[]) template.receiveAndConvert(sendQueue.getName())));
byte[] bytes = (byte[]) template.receiveAndConvert(sendQueue.getName());
assertNotNull(bytes);
assertEquals("bar", new String(bytes));
assertNull(template.receiveAndConvert(queue.getName()));
}
@@ -107,7 +111,6 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
ConnectionFactory connectionFactory = createConnectionFactory();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
Queue sendQueue = new Queue("test.send");
new RabbitAdmin(connectionFactory).declareQueue(sendQueue);
acknowledgeMode = AcknowledgeMode.AUTO;
@@ -124,6 +127,7 @@ public class MessageListenerRecoveryCachingConnectionIntegrationTests {
assertTrue("Timed out waiting for message", waited);
container.stop();
Thread.sleep(200L);
// Foo message is redelivered
assertEquals("foo", template.receiveAndConvert(queue.getName()));

View File

@@ -54,7 +54,7 @@ public class SimpleMessageListenerContainerIntegrationTests {
// SimpleMessageListenerContainer.class, BlockingQueueConsumer.class);
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue);
private final int messageCount;

View File

@@ -55,7 +55,7 @@ public class BrokerRunning extends TestWatchman {
private final boolean purge;
private Queue queue;
private Queue[] queues;
private int port = BrokerTestUtils.getPort();
@@ -66,8 +66,12 @@ public class BrokerRunning extends TestWatchman {
*
* @return a new rule that assumes an existing running broker
*/
public static BrokerRunning isRunningWithEmptyQueue(String queue) {
return new BrokerRunning(true, new Queue(queue), true);
public static BrokerRunning isRunningWithEmptyQueues(String... names) {
Queue[] queues = new Queue[names.length];
for (int i = 0; i < queues.length; i++) {
queues[i] = new Queue(names[i]);
}
return new BrokerRunning(true, true, queues);
}
/**
@@ -75,8 +79,8 @@ public class BrokerRunning extends TestWatchman {
*
* @return a new rule that assumes an existing running broker
*/
public static BrokerRunning isRunningWithEmptyQueue(Queue queue) {
return new BrokerRunning(true, queue, true);
public static BrokerRunning isRunningWithEmptyQueues(Queue... queues) {
return new BrokerRunning(true, true, queues);
}
/**
@@ -93,14 +97,14 @@ public class BrokerRunning extends TestWatchman {
return new BrokerRunning(false);
}
private BrokerRunning(boolean assumeOnline, Queue queue, boolean purge) {
private BrokerRunning(boolean assumeOnline, boolean purge, Queue... queues) {
this.assumeOnline = assumeOnline;
this.queue = queue;
this.queues = queues;
this.purge = purge;
}
private BrokerRunning(boolean assumeOnline, Queue queue) {
this(assumeOnline, queue, false);
private BrokerRunning(boolean assumeOnline, Queue... queues) {
this(assumeOnline, false, queues);
}
private BrokerRunning(boolean assumeOnline) {
@@ -140,13 +144,14 @@ public class BrokerRunning extends TestWatchman {
}
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
String queueName = queue.getName();
for (Queue queue : queues) {
String queueName = queue.getName();
if (purge) {
logger.debug("Deleting queue: " + queueName);
// Delete completely - gets rid of consumers and bindings as well
admin.deleteQueue(queueName);
}
if (purge) {
logger.debug("Deleting queue: " + queueName);
// Delete completely - gets rid of consumers and bindings as well
admin.deleteQueue(queueName);
}
if (isDefaultQueue(queueName)) {
// Just for test probe.
@@ -154,6 +159,7 @@ public class BrokerRunning extends TestWatchman {
} else {
admin.declareQueue(queue);
}
}
brokerOffline = false;
if (!assumeOnline) {
Assume.assumeTrue(brokerOffline);

View File

@@ -22,7 +22,7 @@ public class RabbitTransactionManagerIntegrationTests {
private TransactionTemplate transactionTemplate;
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE);
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(ROUTE);
@Before
public void init() {

View File

@@ -652,12 +652,7 @@ Object receiveAndConvert(String queueName) throws AmqpException;]]></programlist
which are portable from the 0.8 specification and higher are present in
the AmqpAdmin interface in the org.springframework.amqp.core package. The
RabbitMQ implementation of that class is RabbitAdmin located in the
org.springframework.amqp.rabbit.core package. Many configuration and
management functions are broker specific and not included in the AMQP
specification, so the interface RabbitBrokerOperations and its
implementation RabbitBrokerAdmin located in the
org.springframework.amqp.rabbit.admin package is provided to fill that
gap.</para>
org.springframework.amqp.rabbit.core package.</para>
<para>The AmqpAdmin interface is based on using the Spring AMQP domain
abstractions and is shown below:</para>
@@ -1084,12 +1079,4 @@ public class ExampleExternalTransactionAmqpConfiguration {
</table></para>
</section>
<section>
<title>JMX Monitoring</title>
<para>All the methods in the RabbitBrokerAdmin class are exposed through
JMX through the use of the @ManagedOperations annotation. Please refer to
the sample application for how to configure spring to export the
RabbitBrokerAdmin.</para>
</section>
</chapter>