AMQP-275 Physically Close Channel on Listener Stop

When stopping a listener container, physically close the channel
so any queued, but not yet processed messages go back to Ready
instead of remaining un-ack'd.

Rename methods to is/setPhysicalCloseRequired()
This commit is contained in:
Gary Russell
2012-10-31 13:06:43 -04:00
parent e948b32545
commit c675a7ff32
5 changed files with 156 additions and 14 deletions

View File

@@ -319,7 +319,7 @@ public class CachingConnectionFactory extends AbstractConnectionFactory {
// Handle close method: don't pass the call on.
if (active) {
synchronized (this.channelList) {
if (this.channelList.size() < getChannelCacheSize()) {
if (!RabbitUtils.isPhysicalCloseRequired() && this.channelList.size() < getChannelCacheSize()) {
logicalClose((ChannelProxy) proxy);
// Remain open in the channel list.
return null;

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2002-2011 the original author or authors.
*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -33,6 +33,7 @@ import com.rabbitmq.client.ShutdownSignalException;
/**
* @author Mark Fisher
* @author Mark Pollack
* @author Gary Russell
*/
public abstract class RabbitUtils {
@@ -40,6 +41,8 @@ public abstract class RabbitUtils {
private static final Log logger = LogFactory.getLog(RabbitUtils.class);
private static final ThreadLocal<Boolean> physicalCloseRequired = new ThreadLocal<Boolean>();
/**
* Close the given RabbitMQ Connection and ignore any thrown exception. This is useful for typical
* <code>finally</code> blocks in manual RabbitMQ code.
@@ -139,7 +142,7 @@ public abstract class RabbitUtils {
/**
* Declare to that broker that a channel is going to be used transactionally, and convert exceptions that arise.
*
*
* @param channel the channel to use
*/
public static void declareTransactional(Channel channel) {
@@ -150,4 +153,26 @@ public abstract class RabbitUtils {
}
}
/**
* Sets a ThreadLocal indicating the channel MUST be physically closed.
* @param b
*/
public static void setPhysicalCloseRequired(boolean b) {
physicalCloseRequired.set(b);
}
/**
* Gets and removes a ThreadLocal indicating the channel MUST be physically closed.
* @return
*/
public static boolean isPhysicalCloseRequired() {
Boolean mustClose = physicalCloseRequired.get();
if (mustClose == null) {
mustClose = Boolean.FALSE;
}
else {
physicalCloseRequired.remove();
}
return mustClose;
}
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2002-2012 the original author or authors.
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -44,11 +44,11 @@ import com.rabbitmq.utility.Utility;
/**
* Specialized consumer encapsulating knowledge of the broker connections and having its own lifecycle (start and stop).
*
*
* @author Mark Pollack
* @author Dave Syer
* @author Gary Russell
*
*
*/
public class BlockingQueueConsumer {
@@ -136,7 +136,7 @@ public class BlockingQueueConsumer {
/**
* If this is a non-POISON non-null delivery simply return it. If this is POISON we are in shutdown mode, throw
* shutdown. If delivery is null, we may be in shutdown mode. Check and see.
*
*
* @throws InterruptedException
*/
private Message handle(Delivery delivery) throws InterruptedException {
@@ -162,7 +162,7 @@ public class BlockingQueueConsumer {
/**
* Main application-side API: wait for the next message delivery and return it.
*
*
* @return the next message
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
@@ -174,7 +174,7 @@ public class BlockingQueueConsumer {
/**
* Main application-side API: wait for the next message delivery and return it.
*
*
* @param timeout timeout in millisecond
* @return the next message or null if timed out
* @throws InterruptedException if an interrupt is received while waiting
@@ -258,6 +258,7 @@ public class BlockingQueueConsumer {
if (logger.isDebugEnabled()) {
logger.debug("Closing Rabbit Channel: " + channel);
}
RabbitUtils.setPhysicalCloseRequired(true);
// This one never throws exceptions...
RabbitUtils.closeChannel(channel);
deliveryTags.clear();

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.listener;
import static org.junit.Assert.fail;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.test.BrokerRunning;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Gary Russell
* @since 1.1.3
*
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class StopStartIntegrationTests {
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunning();
private static AtomicInteger deliveries = new AtomicInteger();
private static int COUNT = 300000;
@Autowired
private ApplicationContext ctx;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private SimpleMessageListenerContainer container;
@Test
public void test() throws Exception {
for (int i = 0; i < COUNT; i++) {
rabbitTemplate.convertAndSend("foo" + i);
}
long t = System.currentTimeMillis();
container.start();
int n;
int lastN = 0;
while ((n = deliveries.get()) < COUNT) {
Thread.sleep(5000);
container.stop();
container.start();
if (System.currentTimeMillis() - t > 240000 && lastN == n) {
fail("Only received " + deliveries.get());
}
lastN = n;
}
}
public static class StopStartListener implements MessageListener {
@Override
public void onMessage(Message message) {
deliveries.incrementAndGet();
}
}
}

View File

@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="stop.start.exchange" routing-key="stop.start.binding" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="stop.start.queue" />
<rabbit:direct-exchange name="stop.start.exchange">
<rabbit:bindings>
<rabbit:binding queue="stop.start.queue" key="stop.start.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container connection-factory="connectionFactory" concurrency="10" prefetch="20" auto-startup="false">
<rabbit:listener ref="listener" queue-names="stop.start.queue" />
</rabbit:listener-container>
<bean id="listener" class="org.springframework.amqp.rabbit.listener.StopStartIntegrationTests$StopStartListener" />
</beans>