From 8daf84e0747f5c1b0566902089cbed451e5930ed Mon Sep 17 00:00:00 2001 From: al81-ru <62964318+al81-ru@users.noreply.github.com> Date: Mon, 14 Dec 2020 17:56:03 +0300 Subject: [PATCH] Refactor Hazelcast leader initiator yielding * refactor yielding and fix warn messages --- .../hazelcast/leader/LeaderInitiator.java | 96 ++++++++----------- 1 file changed, 42 insertions(+), 54 deletions(-) diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java index f8d284e..ccd6a01 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java @@ -20,6 +20,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -50,6 +51,7 @@ import com.hazelcast.cp.lock.FencedLock; * @author Dave Syer * @author Artem Bilan * @author Mael Le Guével + * @author Alexey Tsoy */ public class LeaderInitiator implements SmartLifecycle, DisposableBean, ApplicationEventPublisherAware { @@ -108,6 +110,8 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat private volatile boolean running; + private final Semaphore yieldSign = new Semaphore(0); + /** * Construct a {@link LeaderInitiator} with a default candidate. * @param client Hazelcast client @@ -268,7 +272,11 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat try { if (LeaderInitiator.this.lock.isLocked()) { // Give it a chance to expire. - Thread.sleep(LeaderInitiator.this.heartBeatMillis); + if (yieldSign.tryAcquire(LeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS)) { + revokeLeadership(); + // Give it a chance to elect some other leader. + Thread.sleep(LeaderInitiator.this.busyWaitMillis); + } } else { // We try to acquire the lock @@ -285,70 +293,50 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat } } catch (Exception e) { - if (this.locked) { - this.locked = false; + // The lock was broken and we are no longer leader + revokeLeadership(); + + + if (isRunning()) { + // Give it a chance to elect some other leader. try { - LeaderInitiator.this.lock.unlock(); + Thread.sleep(LeaderInitiator.this.busyWaitMillis); } - catch (Exception e1) { - logger.debug("Could not unlock - treat as broken " + this.context + - ". Revoking " + (isRunning() ? " and retrying..." : "..."), e1); - + catch (InterruptedException e1) { + // Ignore interruption and let it to be caught on the next cycle. + Thread.currentThread().interrupt(); } - // The lock was broken and we are no longer leader - handleRevoked(); } - - if (e instanceof InterruptedException || Thread.currentThread().isInterrupted()) { - Thread.currentThread().interrupt(); - if (isRunning()) { - logger.warn("Restarting LeaderSelector for " + this.context + " because of error.", e); - LeaderInitiator.this.future = - LeaderInitiator.this.executorService.submit( - () -> { - // Give it a chance to elect some other leader. - Thread.sleep(LeaderInitiator.this.busyWaitMillis); - return LeaderSelector.this.call(); - }); - } - return null; - } - else { - if (isRunning()) { - // Give it a chance to elect some other leader. - try { - Thread.sleep(LeaderInitiator.this.busyWaitMillis); - } - catch (InterruptedException e1) { - // Ignore interruption and let it to be caught on the next cycle. - Thread.currentThread().interrupt(); - } - } - if (logger.isDebugEnabled()) { - logger.debug("Error acquiring the lock for " + this.context + - ". " + (isRunning() ? "Retrying..." : ""), e); - } + if (logger.isDebugEnabled()) { + logger.debug("Error acquiring the lock for " + this.context + + ". " + (isRunning() ? "Retrying..." : ""), e); } } } } finally { - if (this.locked) { - this.locked = false; - try { - LeaderInitiator.this.lock.unlock(); - } - catch (Exception e) { - logger.debug("Could not unlock during stop for " + this.context - + " - treat as broken. Revoking...", e); - } - // We are stopping, therefore not leading any more - handleRevoked(); - } + revokeLeadership(); } + return null; } + private void revokeLeadership() { + if (this.locked) { + this.locked = false; + try { + LeaderInitiator.this.lock.unlock(); + } + catch (Exception e1) { + logger.debug("Could not unlock - treat as broken " + this.context + + ". Revoking " + (isRunning() ? " and retrying..." : "..."), e1); + + } + + handleRevoked(); + } + } + private void handleGranted() throws InterruptedException { LeaderInitiator.this.candidate.onGranted(this.context); @@ -390,8 +378,8 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat @Override public void yield() { - if (LeaderInitiator.this.future != null) { - LeaderInitiator.this.future.cancel(true); + if (isLeader()) { + LeaderInitiator.this.yieldSign.release(); } }