diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java index ccd6a01..c6a638a 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.integration.hazelcast.leader; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -39,6 +40,7 @@ import org.springframework.integration.support.leader.LockRegistryLeaderInitiato import org.springframework.util.Assert; import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.cp.CPSubsystem; import com.hazelcast.cp.lock.FencedLock; /** @@ -52,6 +54,7 @@ import com.hazelcast.cp.lock.FencedLock; * @author Artem Bilan * @author Mael Le Guével * @author Alexey Tsoy + * @author Robert Höglund */ public class LeaderInitiator implements SmartLifecycle, DisposableBean, ApplicationEventPublisherAware { @@ -101,11 +104,6 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat */ private volatile Future future; - /** - * Hazelcast distributed lock. - */ - private volatile FencedLock lock; - private boolean customPublisher = false; private volatile boolean running; @@ -210,7 +208,6 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat @Override public synchronized void start() { if (!this.running) { - this.lock = this.client.getCPSubsystem().getLock(this.candidate.getRole()); this.leaderSelector = new LeaderSelector(); this.running = true; this.future = this.executorService.submit(this.leaderSelector); @@ -253,6 +250,18 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat this.executorService.shutdown(); } + + FencedLock getLock() { + CPSubsystem cpSubSystem = this.client.getCPSubsystem(); + FencedLock lock = cpSubSystem.getLock(this.candidate.getRole()); + if (logger.isDebugEnabled()) { + logger.debug( + String.format("Use lock groupId '%s', lock count '%s'", lock.getGroupId(), lock.getLockCount())); + } + return lock; + } + + /** * Callable that manages the acquisition of Hazelcast locks * for leadership election. @@ -263,14 +272,22 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat protected final String role = LeaderInitiator.this.candidate.getRole(); - private volatile boolean locked = false; + private volatile boolean leader = false; @Override public Void call() { try { while (isRunning()) { try { - if (LeaderInitiator.this.lock.isLocked()) { + if (logger.isTraceEnabled()) { + logger.trace("Am I the leader (" + LeaderInitiator.this.candidate.getRole() + ") ? " + + this.leader); + } + if (getLock().isLockedByCurrentThread()) { + if (!this.leader) { + // Since we have the lock we need to ensure that the leader flag is set + this.leader = true; + } // Give it a chance to expire. if (yieldSign.tryAcquire(LeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS)) { revokeLeadership(); @@ -280,15 +297,12 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat } else { // We try to acquire the lock - boolean acquired = - LeaderInitiator.this.lock.tryLock(LeaderInitiator.this.heartBeatMillis, - TimeUnit.MILLISECONDS); - if (!this.locked) { - if (acquired) { - // Success: we are now leader - this.locked = true; - handleGranted(); - } + boolean acquired = getLock() + .tryLock(LeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS); + if (acquired && !this.leader) { + // Success: we are now leader + this.leader = true; + handleGranted(); } } } @@ -322,14 +336,15 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat } private void revokeLeadership() { - if (this.locked) { - this.locked = false; + if (this.leader) { + this.leader = false; try { - LeaderInitiator.this.lock.unlock(); + // Try to unlock + getLock().unlock(); } catch (Exception e1) { - logger.debug("Could not unlock - treat as broken " + this.context + - ". Revoking " + (isRunning() ? " and retrying..." : "..."), e1); + logger.warn("Could not unlock - treat as broken " + this.context + ". Revoking " + + (isRunning() ? " and retrying..." : "..."), e1); } @@ -373,7 +388,7 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat @Override public boolean isLeader() { - return LeaderInitiator.this.leaderSelector.locked; + return LeaderInitiator.this.leaderSelector.leader; } @Override