Refactor Hazelcast leader initiator yielding
* refactor yielding and fix warn messages
This commit is contained in:
@@ -20,6 +20,7 @@ import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@@ -50,6 +51,7 @@ import com.hazelcast.cp.lock.FencedLock;
|
||||
* @author Dave Syer
|
||||
* @author Artem Bilan
|
||||
* @author Mael Le Guével
|
||||
* @author Alexey Tsoy
|
||||
*/
|
||||
public class LeaderInitiator implements SmartLifecycle, DisposableBean, ApplicationEventPublisherAware {
|
||||
|
||||
@@ -108,6 +110,8 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
|
||||
private volatile boolean running;
|
||||
|
||||
private final Semaphore yieldSign = new Semaphore(0);
|
||||
|
||||
/**
|
||||
* Construct a {@link LeaderInitiator} with a default candidate.
|
||||
* @param client Hazelcast client
|
||||
@@ -268,7 +272,11 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
try {
|
||||
if (LeaderInitiator.this.lock.isLocked()) {
|
||||
// Give it a chance to expire.
|
||||
Thread.sleep(LeaderInitiator.this.heartBeatMillis);
|
||||
if (yieldSign.tryAcquire(LeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS)) {
|
||||
revokeLeadership();
|
||||
// Give it a chance to elect some other leader.
|
||||
Thread.sleep(LeaderInitiator.this.busyWaitMillis);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// We try to acquire the lock
|
||||
@@ -285,70 +293,50 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (this.locked) {
|
||||
this.locked = false;
|
||||
// The lock was broken and we are no longer leader
|
||||
revokeLeadership();
|
||||
|
||||
|
||||
if (isRunning()) {
|
||||
// Give it a chance to elect some other leader.
|
||||
try {
|
||||
LeaderInitiator.this.lock.unlock();
|
||||
Thread.sleep(LeaderInitiator.this.busyWaitMillis);
|
||||
}
|
||||
catch (Exception e1) {
|
||||
logger.debug("Could not unlock - treat as broken " + this.context +
|
||||
". Revoking " + (isRunning() ? " and retrying..." : "..."), e1);
|
||||
|
||||
catch (InterruptedException e1) {
|
||||
// Ignore interruption and let it to be caught on the next cycle.
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
// The lock was broken and we are no longer leader
|
||||
handleRevoked();
|
||||
}
|
||||
|
||||
if (e instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
|
||||
Thread.currentThread().interrupt();
|
||||
if (isRunning()) {
|
||||
logger.warn("Restarting LeaderSelector for " + this.context + " because of error.", e);
|
||||
LeaderInitiator.this.future =
|
||||
LeaderInitiator.this.executorService.submit(
|
||||
() -> {
|
||||
// Give it a chance to elect some other leader.
|
||||
Thread.sleep(LeaderInitiator.this.busyWaitMillis);
|
||||
return LeaderSelector.this.call();
|
||||
});
|
||||
}
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
if (isRunning()) {
|
||||
// Give it a chance to elect some other leader.
|
||||
try {
|
||||
Thread.sleep(LeaderInitiator.this.busyWaitMillis);
|
||||
}
|
||||
catch (InterruptedException e1) {
|
||||
// Ignore interruption and let it to be caught on the next cycle.
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Error acquiring the lock for " + this.context +
|
||||
". " + (isRunning() ? "Retrying..." : ""), e);
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Error acquiring the lock for " + this.context +
|
||||
". " + (isRunning() ? "Retrying..." : ""), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
if (this.locked) {
|
||||
this.locked = false;
|
||||
try {
|
||||
LeaderInitiator.this.lock.unlock();
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.debug("Could not unlock during stop for " + this.context
|
||||
+ " - treat as broken. Revoking...", e);
|
||||
}
|
||||
// We are stopping, therefore not leading any more
|
||||
handleRevoked();
|
||||
}
|
||||
revokeLeadership();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void revokeLeadership() {
|
||||
if (this.locked) {
|
||||
this.locked = false;
|
||||
try {
|
||||
LeaderInitiator.this.lock.unlock();
|
||||
}
|
||||
catch (Exception e1) {
|
||||
logger.debug("Could not unlock - treat as broken " + this.context +
|
||||
". Revoking " + (isRunning() ? " and retrying..." : "..."), e1);
|
||||
|
||||
}
|
||||
|
||||
handleRevoked();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void handleGranted() throws InterruptedException {
|
||||
LeaderInitiator.this.candidate.onGranted(this.context);
|
||||
@@ -390,8 +378,8 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
|
||||
@Override
|
||||
public void yield() {
|
||||
if (LeaderInitiator.this.future != null) {
|
||||
LeaderInitiator.this.future.cancel(true);
|
||||
if (isLeader()) {
|
||||
LeaderInitiator.this.yieldSign.release();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user