GH-259: Ensure lock after HZ cluster shutdown
Fixes https://github.com/spring-projects/spring-integration-extensions/issues/259 This will ensure that, if a HZ cluster is totally shut down and then restarted the leader state will be reset if the client still possess/regain the lock.
This commit is contained in:
committed by
Artem Bilan
parent
70f2dfb89e
commit
e11e65e3f5
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015-2020 the original author or authors.
|
||||
* Copyright 2015-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.integration.hazelcast.leader;
|
||||
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@@ -39,6 +40,7 @@ import org.springframework.integration.support.leader.LockRegistryLeaderInitiato
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import com.hazelcast.core.HazelcastInstance;
|
||||
import com.hazelcast.cp.CPSubsystem;
|
||||
import com.hazelcast.cp.lock.FencedLock;
|
||||
|
||||
/**
|
||||
@@ -52,6 +54,7 @@ import com.hazelcast.cp.lock.FencedLock;
|
||||
* @author Artem Bilan
|
||||
* @author Mael Le Guével
|
||||
* @author Alexey Tsoy
|
||||
* @author Robert Höglund
|
||||
*/
|
||||
public class LeaderInitiator implements SmartLifecycle, DisposableBean, ApplicationEventPublisherAware {
|
||||
|
||||
@@ -101,11 +104,6 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
*/
|
||||
private volatile Future<Void> future;
|
||||
|
||||
/**
|
||||
* Hazelcast distributed lock.
|
||||
*/
|
||||
private volatile FencedLock lock;
|
||||
|
||||
private boolean customPublisher = false;
|
||||
|
||||
private volatile boolean running;
|
||||
@@ -210,7 +208,6 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (!this.running) {
|
||||
this.lock = this.client.getCPSubsystem().getLock(this.candidate.getRole());
|
||||
this.leaderSelector = new LeaderSelector();
|
||||
this.running = true;
|
||||
this.future = this.executorService.submit(this.leaderSelector);
|
||||
@@ -253,6 +250,18 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
this.executorService.shutdown();
|
||||
}
|
||||
|
||||
|
||||
FencedLock getLock() {
|
||||
CPSubsystem cpSubSystem = this.client.getCPSubsystem();
|
||||
FencedLock lock = cpSubSystem.getLock(this.candidate.getRole());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(
|
||||
String.format("Use lock groupId '%s', lock count '%s'", lock.getGroupId(), lock.getLockCount()));
|
||||
}
|
||||
return lock;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Callable that manages the acquisition of Hazelcast locks
|
||||
* for leadership election.
|
||||
@@ -263,14 +272,22 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
|
||||
protected final String role = LeaderInitiator.this.candidate.getRole();
|
||||
|
||||
private volatile boolean locked = false;
|
||||
private volatile boolean leader = false;
|
||||
|
||||
@Override
|
||||
public Void call() {
|
||||
try {
|
||||
while (isRunning()) {
|
||||
try {
|
||||
if (LeaderInitiator.this.lock.isLocked()) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Am I the leader (" + LeaderInitiator.this.candidate.getRole() + ") ? "
|
||||
+ this.leader);
|
||||
}
|
||||
if (getLock().isLockedByCurrentThread()) {
|
||||
if (!this.leader) {
|
||||
// Since we have the lock we need to ensure that the leader flag is set
|
||||
this.leader = true;
|
||||
}
|
||||
// Give it a chance to expire.
|
||||
if (yieldSign.tryAcquire(LeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS)) {
|
||||
revokeLeadership();
|
||||
@@ -280,15 +297,12 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
}
|
||||
else {
|
||||
// We try to acquire the lock
|
||||
boolean acquired =
|
||||
LeaderInitiator.this.lock.tryLock(LeaderInitiator.this.heartBeatMillis,
|
||||
TimeUnit.MILLISECONDS);
|
||||
if (!this.locked) {
|
||||
if (acquired) {
|
||||
// Success: we are now leader
|
||||
this.locked = true;
|
||||
handleGranted();
|
||||
}
|
||||
boolean acquired = getLock()
|
||||
.tryLock(LeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS);
|
||||
if (acquired && !this.leader) {
|
||||
// Success: we are now leader
|
||||
this.leader = true;
|
||||
handleGranted();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -322,14 +336,15 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
}
|
||||
|
||||
private void revokeLeadership() {
|
||||
if (this.locked) {
|
||||
this.locked = false;
|
||||
if (this.leader) {
|
||||
this.leader = false;
|
||||
try {
|
||||
LeaderInitiator.this.lock.unlock();
|
||||
// Try to unlock
|
||||
getLock().unlock();
|
||||
}
|
||||
catch (Exception e1) {
|
||||
logger.debug("Could not unlock - treat as broken " + this.context +
|
||||
". Revoking " + (isRunning() ? " and retrying..." : "..."), e1);
|
||||
logger.warn("Could not unlock - treat as broken " + this.context + ". Revoking "
|
||||
+ (isRunning() ? " and retrying..." : "..."), e1);
|
||||
|
||||
}
|
||||
|
||||
@@ -373,7 +388,7 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
|
||||
|
||||
@Override
|
||||
public boolean isLeader() {
|
||||
return LeaderInitiator.this.leaderSelector.locked;
|
||||
return LeaderInitiator.this.leaderSelector.leader;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user