Interrupt listener invoker threads on shutdown (after initial wait step)
Issue: SPR-16536
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -563,21 +563,32 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||
logger.debug("Waiting for shutdown of message listener invokers");
|
||||
try {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
// Waiting for AsyncMessageListenerInvokers to deactivate themselves...
|
||||
long receiveTimeout = getReceiveTimeout();
|
||||
long waitStartTime = System.currentTimeMillis();
|
||||
int waitCount = 0;
|
||||
while (this.activeInvokerCount > 0) {
|
||||
if (waitCount > 0 && !isAcceptMessagesWhileStopping() &&
|
||||
System.currentTimeMillis() - waitStartTime >= receiveTimeout) {
|
||||
// Unexpectedly some invokers are still active after the receive timeout period
|
||||
// -> interrupt remaining receive attempts since we'd reject the messages anyway
|
||||
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
|
||||
scheduledInvoker.interruptIfNecessary();
|
||||
}
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
|
||||
" message listener invokers");
|
||||
" message listener invokers (iteration " + waitCount + ")");
|
||||
}
|
||||
long timeout = getReceiveTimeout();
|
||||
if (timeout > 0) {
|
||||
this.lifecycleMonitor.wait(timeout);
|
||||
// Wait for AsyncMessageListenerInvokers to deactivate themselves...
|
||||
if (receiveTimeout > 0) {
|
||||
this.lifecycleMonitor.wait(receiveTimeout);
|
||||
}
|
||||
else {
|
||||
this.lifecycleMonitor.wait();
|
||||
}
|
||||
waitCount++;
|
||||
}
|
||||
// Clear remaining scheduled invokers, possibly left over as paused tasks...
|
||||
// Clear remaining scheduled invokers, possibly left over as paused tasks
|
||||
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
|
||||
scheduledInvoker.clearResources();
|
||||
}
|
||||
@@ -1050,6 +1061,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||
|
||||
private volatile boolean idle = true;
|
||||
|
||||
@Nullable
|
||||
private volatile Thread currentReceiveThread;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (lifecycleMonitor) {
|
||||
@@ -1169,10 +1183,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||
}
|
||||
|
||||
private boolean invokeListener() throws JMSException {
|
||||
initResourcesIfNecessary();
|
||||
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
|
||||
this.lastMessageSucceeded = true;
|
||||
return messageReceived;
|
||||
this.currentReceiveThread = Thread.currentThread();
|
||||
try {
|
||||
initResourcesIfNecessary();
|
||||
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
|
||||
this.lastMessageSucceeded = true;
|
||||
return messageReceived;
|
||||
}
|
||||
finally {
|
||||
this.currentReceiveThread = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void decreaseActiveInvokerCount() {
|
||||
@@ -1207,6 +1227,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||
}
|
||||
}
|
||||
|
||||
private void interruptIfNecessary() {
|
||||
Thread currentReceiveThread = this.currentReceiveThread;
|
||||
if (currentReceiveThread != null && !currentReceiveThread.isInterrupted()) {
|
||||
currentReceiveThread.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private void clearResources() {
|
||||
if (sharedConnectionEnabled()) {
|
||||
synchronized (sharedConnectionMonitor) {
|
||||
|
||||
Reference in New Issue
Block a user