Javadocs and extra integration test
This commit is contained in:
@@ -1,17 +1,14 @@
|
||||
/*
|
||||
* Copyright 2002-2010 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.amqp.core;
|
||||
@@ -23,12 +20,32 @@ import java.util.Map;
|
||||
*/
|
||||
public interface Exchange {
|
||||
|
||||
/**
|
||||
* The name of the exchange.
|
||||
*
|
||||
* @return the name of the exchange
|
||||
*/
|
||||
String getName();
|
||||
|
||||
/**
|
||||
* The type of the exchange. See {@link ExchangeTypes} for some well-known examples.
|
||||
*
|
||||
* @return the type of the exchange
|
||||
*/
|
||||
String getType();
|
||||
|
||||
/**
|
||||
* A durable exchange will survive a server restart
|
||||
*
|
||||
* @return true if durable
|
||||
*/
|
||||
boolean isDurable();
|
||||
|
||||
/**
|
||||
* True if the server should delete the exchange when it is no longer in use (if all bindings are deleted).
|
||||
*
|
||||
* @return true if auto-delete
|
||||
*/
|
||||
boolean isAutoDelete();
|
||||
|
||||
Map<String, Object> getArguments();
|
||||
|
||||
@@ -86,14 +86,30 @@ public class Queue {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
/**
|
||||
* A durable queue will survive a server restart
|
||||
*
|
||||
* @return true if durable
|
||||
*/
|
||||
public boolean isDurable() {
|
||||
return this.durable;
|
||||
}
|
||||
|
||||
/**
|
||||
* True if the server should only send messages to the declarer's connection.
|
||||
*
|
||||
* @return true if auto-delete
|
||||
*/
|
||||
public boolean isExclusive() {
|
||||
return this.exclusive;
|
||||
}
|
||||
|
||||
/**
|
||||
* True if the server should delete the queue when it is no longer in use (the last consumer is cancelled). A queue
|
||||
* that never has any consumers will not be deleted automatically.
|
||||
*
|
||||
* @return true if auto-delete
|
||||
*/
|
||||
public boolean isAutoDelete() {
|
||||
return this.autoDelete;
|
||||
}
|
||||
|
||||
@@ -218,9 +218,12 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Initiali
|
||||
return;
|
||||
}
|
||||
try {
|
||||
// ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
|
||||
// one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
|
||||
// chatter). If anyone has a problem with it: use auto-startup="false".
|
||||
/*
|
||||
* ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
|
||||
* one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
|
||||
* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
|
||||
* declared for every connection. If anyone has a problem with it: use auto-startup="false".
|
||||
*/
|
||||
initialize();
|
||||
} finally {
|
||||
initializing.compareAndSet(true, false);
|
||||
@@ -255,6 +258,38 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Initiali
|
||||
final Collection<Exchange> exchanges = applicationContext.getBeansOfType(Exchange.class).values();
|
||||
final Collection<Queue> queues = applicationContext.getBeansOfType(Queue.class).values();
|
||||
final Collection<Binding> bindings = applicationContext.getBeansOfType(Binding.class).values();
|
||||
|
||||
for (Exchange exchange : exchanges) {
|
||||
if (!exchange.isDurable()) {
|
||||
logger.warn("Auto-declaring a non-durable Exchange ("
|
||||
+ exchange.getName()
|
||||
+ "). It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.");
|
||||
}
|
||||
if (exchange.isAutoDelete()) {
|
||||
logger.warn("Auto-declaring an auto-delete Exchange ("
|
||||
+ exchange.getName()
|
||||
+ "). It will be deleted by the broker if not in use (if all bindings are deleted), but will only be redeclared if the connection is closed and reopened.");
|
||||
}
|
||||
}
|
||||
|
||||
for (Queue queue : queues) {
|
||||
if (!queue.isDurable()) {
|
||||
logger.warn("Auto-declaring a non-durable Queue ("
|
||||
+ queue.getName()
|
||||
+ "). It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.");
|
||||
}
|
||||
if (queue.isAutoDelete()) {
|
||||
logger.warn("Auto-declaring an auto-delete Queue ("
|
||||
+ queue.getName()
|
||||
+ "). It will be deleted deleted by the broker if not in use, and all messages will be lost. Redeclared when the connection is closed and reopened.");
|
||||
}
|
||||
if (queue.isExclusive()) {
|
||||
logger.warn("Auto-declaring an exclusive Queue ("
|
||||
+ queue.getName()
|
||||
+ "). It cannot be accessed by consumers on another connection, and will be redeclared if the connection is reopened.");
|
||||
}
|
||||
}
|
||||
|
||||
rabbitTemplate.execute(new ChannelCallback<Object>() {
|
||||
public Object doInRabbit(Channel channel) throws Exception {
|
||||
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
|
||||
|
||||
@@ -9,6 +9,7 @@ import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.springframework.amqp.AmqpIOException;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.test.BrokerRunning;
|
||||
@@ -59,6 +60,35 @@ public class RabbitAdminIntegrationTests {
|
||||
assertTrue(rabbitAdmin.deleteQueue(queue.getName()));
|
||||
}
|
||||
|
||||
@Test(expected = AmqpIOException.class)
|
||||
public void testDoubleDeclarationOfExclusiveQueue() throws Exception {
|
||||
// Expect exception because the queue is locked when it is declared a second time.
|
||||
CachingConnectionFactory connectionFactory1 = new CachingConnectionFactory();
|
||||
connectionFactory1.setPort(BrokerTestUtils.getPort());
|
||||
CachingConnectionFactory connectionFactory2 = new CachingConnectionFactory();
|
||||
connectionFactory2.setPort(BrokerTestUtils.getPort());
|
||||
Queue queue = new Queue("test.queue", false, true, true);
|
||||
rabbitAdmin.deleteQueue(queue.getName());
|
||||
new RabbitAdmin(connectionFactory1).declareQueue(queue);
|
||||
new RabbitAdmin(connectionFactory2).declareQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoubleDeclarationOfAutodeleteQueue() throws Exception {
|
||||
// No error expected here: the queue is autodeleted when the last consumer is cancelled, but this one never has
|
||||
// any consumers.
|
||||
CachingConnectionFactory connectionFactory1 = new CachingConnectionFactory();
|
||||
connectionFactory1.setPort(BrokerTestUtils.getPort());
|
||||
CachingConnectionFactory connectionFactory2 = new CachingConnectionFactory();
|
||||
connectionFactory2.setPort(BrokerTestUtils.getPort());
|
||||
Queue queue = new Queue("test.queue", false, false, true);
|
||||
rabbitAdmin.deleteQueue(queue.getName());
|
||||
new RabbitAdmin(connectionFactory1).declareQueue(queue);
|
||||
new RabbitAdmin(connectionFactory2).declareQueue(queue);
|
||||
connectionFactory1.destroy();
|
||||
connectionFactory2.destroy();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartupWithAutodelete() throws Exception {
|
||||
|
||||
@@ -66,7 +96,7 @@ public class RabbitAdminIntegrationTests {
|
||||
context.getBeanFactory().registerSingleton("foo", queue);
|
||||
rabbitAdmin.deleteQueue(queue.getName());
|
||||
rabbitAdmin.afterPropertiesSet();
|
||||
|
||||
|
||||
final AtomicReference<Connection> connectionHolder = new AtomicReference<Connection>();
|
||||
|
||||
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
|
||||
@@ -108,7 +138,7 @@ public class RabbitAdminIntegrationTests {
|
||||
context.getBeanFactory().registerSingleton("foo", queue);
|
||||
rabbitAdmin.deleteQueue(queue.getName());
|
||||
rabbitAdmin.afterPropertiesSet();
|
||||
|
||||
|
||||
final AtomicReference<Connection> connectionHolder = new AtomicReference<Connection>();
|
||||
|
||||
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
|
||||
@@ -123,7 +153,7 @@ public class RabbitAdminIntegrationTests {
|
||||
assertTrue("Expected Queue to exist", exists);
|
||||
|
||||
assertTrue(queueExists(connectionHolder.get(), queue));
|
||||
|
||||
|
||||
// simulate broker going down and coming back up...
|
||||
rabbitAdmin.deleteQueue(queue.getName());
|
||||
connectionFactory.destroy();
|
||||
@@ -154,7 +184,7 @@ public class RabbitAdminIntegrationTests {
|
||||
* @return true if the queue exists
|
||||
*/
|
||||
private boolean queueExists(Connection connection, Queue queue) throws Exception {
|
||||
if (connection==null) {
|
||||
if (connection == null) {
|
||||
ConnectionFactory connectionFactory = new ConnectionFactory();
|
||||
connectionFactory.setPort(BrokerTestUtils.getPort());
|
||||
connection = connectionFactory.newConnection();
|
||||
|
||||
Reference in New Issue
Block a user