RedisLockRegistry: Don't expire not acquired lock
Fix race condition, when methods `RedisLockRegistry#expireUnusedOlderThan` and `RedisLockRegistry#obtain` are executed successively. It's possible to delete the lock from `RedisLockRegistry#expireUnusedOlderThan` method, when lock is created but is not acquired (`RedisLock#getLockedAt = 0`) It can lead to the situation, when `RedisLockRegistry#obtain` returns multiple locks with the same redis-key, which shouldn't happen at all. * Skip locks from expiration when their `lockedAt == 0` - new, not acquired yet. **Cherry-pick to `6.0.x` & `5.5.x`** # Conflicts: # spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java
This commit is contained in:
@@ -81,6 +81,7 @@ import org.springframework.util.concurrent.SettableListenableFuture;
|
||||
* @author Artem Bilan
|
||||
* @author Vedran Pavic
|
||||
* @author Unseok Kim
|
||||
* @author Anton Gabov
|
||||
*
|
||||
* @since 4.0
|
||||
*
|
||||
@@ -235,7 +236,11 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl
|
||||
this.locks.entrySet()
|
||||
.removeIf(entry -> {
|
||||
RedisLock lock = entry.getValue();
|
||||
return now - lock.getLockedAt() > age && !lock.isAcquiredInThisProcess();
|
||||
long lockedAt = lock.getLockedAt();
|
||||
return now - lockedAt > age
|
||||
// 'lockedAt = 0' means that the lock is still not acquired!
|
||||
&& lockedAt > 0
|
||||
&& !lock.isAcquiredInThisProcess();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ package org.springframework.integration.redis.util;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Queue;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
@@ -33,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
@@ -47,8 +47,6 @@ import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.core.RedisCallback;
|
||||
import org.springframework.data.redis.core.RedisOperations;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.integration.redis.rules.RedisAvailable;
|
||||
import org.springframework.integration.redis.rules.RedisAvailableTests;
|
||||
@@ -57,9 +55,6 @@ import org.springframework.integration.test.util.TestUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.BDDMockito.willReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
@@ -67,6 +62,7 @@ import static org.mockito.Mockito.mock;
|
||||
* @author Artem Bilan
|
||||
* @author Vedran Pavic
|
||||
* @author Unseok Kim
|
||||
* @author Anton Gabov
|
||||
*
|
||||
* @since 4.0
|
||||
*
|
||||
@@ -824,21 +820,65 @@ public class RedisLockRegistryTests extends RedisAvailableTests {
|
||||
registry3.destroy();
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Test
|
||||
public void testUlink() {
|
||||
RedisOperations ops = mock(RedisOperations.class);
|
||||
Properties props = new Properties();
|
||||
willReturn(props).given(ops).execute(any(RedisCallback.class));
|
||||
props.setProperty("redis_version", "3.0.0");
|
||||
RedisLockRegistry registry = new RedisLockRegistry(mock(RedisConnectionFactory.class), "foo");
|
||||
@RedisAvailable
|
||||
public void testTwoThreadsRemoveAndObtainSameLockSimultaneously() throws Exception {
|
||||
final int TEST_CNT = 200;
|
||||
final long EXPIRATION_TIME_MILLIS = 10000;
|
||||
final long LOCK_WAIT_TIME_MILLIS = 500;
|
||||
final String testKey = "testKey";
|
||||
|
||||
final RedisLockRegistry registry = new RedisLockRegistry(getConnectionFactoryForTest(), this.registryKey);
|
||||
registry.setRedisLockType(testRedisLockType);
|
||||
assertThat(TestUtils.getPropertyValue(registry, "ulinkAvailable", Boolean.class)).isFalse();
|
||||
props.setProperty("redis_version", "4.0.0");
|
||||
registry = new RedisLockRegistry(mock(RedisConnectionFactory.class), "foo");
|
||||
registry.setRedisLockType(testRedisLockType);
|
||||
assertThat(TestUtils.getPropertyValue(registry, "ulinkAvailable", Boolean.class)).isTrue();
|
||||
|
||||
for (int i = 0; i < TEST_CNT; i++) {
|
||||
final String lockKey = testKey + i;
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Lock> lock1 = new AtomicReference<>();
|
||||
final AtomicReference<Lock> lock2 = new AtomicReference<>();
|
||||
|
||||
Thread thread1 = new Thread(() -> {
|
||||
try {
|
||||
latch.await();
|
||||
// remove lock
|
||||
registry.expireUnusedOlderThan(EXPIRATION_TIME_MILLIS);
|
||||
// obtain new lock and try to acquire
|
||||
Lock lock = registry.obtain(lockKey);
|
||||
lock.tryLock(LOCK_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
|
||||
lock.unlock();
|
||||
|
||||
lock1.set(lock);
|
||||
}
|
||||
catch (InterruptedException ignore) {
|
||||
}
|
||||
});
|
||||
|
||||
Thread thread2 = new Thread(() -> {
|
||||
try {
|
||||
latch.await();
|
||||
// remove lock
|
||||
registry.expireUnusedOlderThan(EXPIRATION_TIME_MILLIS);
|
||||
// obtain new lock and try to acquire
|
||||
Lock lock = registry.obtain(lockKey);
|
||||
lock.tryLock(LOCK_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
|
||||
lock.unlock();
|
||||
|
||||
lock2.set(lock);
|
||||
}
|
||||
catch (InterruptedException ignore) {
|
||||
}
|
||||
});
|
||||
|
||||
thread1.start();
|
||||
thread2.start();
|
||||
latch.countDown();
|
||||
thread1.join();
|
||||
thread2.join();
|
||||
|
||||
// locks must be the same!
|
||||
assertThat(lock1.get()).isEqualTo(lock2.get());
|
||||
}
|
||||
|
||||
registry.destroy();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user