Interrupt listener invoker threads on shutdown (after initial wait step)

Issue: SPR-16536
This commit is contained in:
Juergen Hoeller
2018-02-28 00:06:46 +01:00
parent d1ccecd021
commit 95aad9cdc2

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -563,21 +563,32 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
logger.debug("Waiting for shutdown of message listener invokers");
try {
synchronized (this.lifecycleMonitor) {
// Waiting for AsyncMessageListenerInvokers to deactivate themselves...
long receiveTimeout = getReceiveTimeout();
long waitStartTime = System.currentTimeMillis();
int waitCount = 0;
while (this.activeInvokerCount > 0) {
if (waitCount > 0 && !isAcceptMessagesWhileStopping() &&
System.currentTimeMillis() - waitStartTime >= receiveTimeout) {
// Unexpectedly some invokers are still active after the receive timeout period
// -> interrupt remaining receive attempts since we'd reject the messages anyway
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
scheduledInvoker.interruptIfNecessary();
}
}
if (logger.isDebugEnabled()) {
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
" message listener invokers");
" message listener invokers (iteration " + waitCount + ")");
}
long timeout = getReceiveTimeout();
if (timeout > 0) {
this.lifecycleMonitor.wait(timeout);
// Wait for AsyncMessageListenerInvokers to deactivate themselves...
if (receiveTimeout > 0) {
this.lifecycleMonitor.wait(receiveTimeout);
}
else {
this.lifecycleMonitor.wait();
}
waitCount++;
}
// Clear remaining scheduled invokers, possibly left over as paused tasks...
// Clear remaining scheduled invokers, possibly left over as paused tasks
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
scheduledInvoker.clearResources();
}
@@ -1050,6 +1061,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private volatile boolean idle = true;
@Nullable
private volatile Thread currentReceiveThread;
@Override
public void run() {
synchronized (lifecycleMonitor) {
@@ -1169,10 +1183,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
private boolean invokeListener() throws JMSException {
initResourcesIfNecessary();
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
this.currentReceiveThread = Thread.currentThread();
try {
initResourcesIfNecessary();
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
}
finally {
this.currentReceiveThread = null;
}
}
private void decreaseActiveInvokerCount() {
@@ -1207,6 +1227,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
}
private void interruptIfNecessary() {
Thread currentReceiveThread = this.currentReceiveThread;
if (currentReceiveThread != null && !currentReceiveThread.isInterrupted()) {
currentReceiveThread.interrupt();
}
}
private void clearResources() {
if (sharedConnectionEnabled()) {
synchronized (sharedConnectionMonitor) {