diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java index ad901afb6b..294b43be86 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java @@ -81,6 +81,7 @@ import org.springframework.util.concurrent.SettableListenableFuture; * @author Artem Bilan * @author Vedran Pavic * @author Unseok Kim + * @author Anton Gabov * * @since 4.0 * @@ -235,7 +236,11 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl this.locks.entrySet() .removeIf(entry -> { RedisLock lock = entry.getValue(); - return now - lock.getLockedAt() > age && !lock.isAcquiredInThisProcess(); + long lockedAt = lock.getLockedAt(); + return now - lockedAt > age + // 'lockedAt = 0' means that the lock is still not acquired! + && lockedAt > 0 + && !lock.isAcquiredInThisProcess(); }); } } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java index 94c5cf5c87..b378e76d02 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java @@ -19,7 +19,6 @@ package org.springframework.integration.redis.util; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Queue; import java.util.UUID; import java.util.concurrent.Callable; @@ -33,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -47,8 +47,6 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.core.RedisCallback; -import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.integration.redis.rules.RedisAvailable; import org.springframework.integration.redis.rules.RedisAvailableTests; @@ -57,9 +55,6 @@ import org.springframework.integration.test.util.TestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.BDDMockito.willReturn; -import static org.mockito.Mockito.mock; /** * @author Gary Russell @@ -67,6 +62,7 @@ import static org.mockito.Mockito.mock; * @author Artem Bilan * @author Vedran Pavic * @author Unseok Kim + * @author Anton Gabov * * @since 4.0 * @@ -824,21 +820,65 @@ public class RedisLockRegistryTests extends RedisAvailableTests { registry3.destroy(); } - - @SuppressWarnings({ "unchecked", "rawtypes" }) @Test - public void testUlink() { - RedisOperations ops = mock(RedisOperations.class); - Properties props = new Properties(); - willReturn(props).given(ops).execute(any(RedisCallback.class)); - props.setProperty("redis_version", "3.0.0"); - RedisLockRegistry registry = new RedisLockRegistry(mock(RedisConnectionFactory.class), "foo"); + @RedisAvailable + public void testTwoThreadsRemoveAndObtainSameLockSimultaneously() throws Exception { + final int TEST_CNT = 200; + final long EXPIRATION_TIME_MILLIS = 10000; + final long LOCK_WAIT_TIME_MILLIS = 500; + final String testKey = "testKey"; + + final RedisLockRegistry registry = new RedisLockRegistry(getConnectionFactoryForTest(), this.registryKey); registry.setRedisLockType(testRedisLockType); - assertThat(TestUtils.getPropertyValue(registry, "ulinkAvailable", Boolean.class)).isFalse(); - props.setProperty("redis_version", "4.0.0"); - registry = new RedisLockRegistry(mock(RedisConnectionFactory.class), "foo"); - registry.setRedisLockType(testRedisLockType); - assertThat(TestUtils.getPropertyValue(registry, "ulinkAvailable", Boolean.class)).isTrue(); + + for (int i = 0; i < TEST_CNT; i++) { + final String lockKey = testKey + i; + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference lock1 = new AtomicReference<>(); + final AtomicReference lock2 = new AtomicReference<>(); + + Thread thread1 = new Thread(() -> { + try { + latch.await(); + // remove lock + registry.expireUnusedOlderThan(EXPIRATION_TIME_MILLIS); + // obtain new lock and try to acquire + Lock lock = registry.obtain(lockKey); + lock.tryLock(LOCK_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS); + lock.unlock(); + + lock1.set(lock); + } + catch (InterruptedException ignore) { + } + }); + + Thread thread2 = new Thread(() -> { + try { + latch.await(); + // remove lock + registry.expireUnusedOlderThan(EXPIRATION_TIME_MILLIS); + // obtain new lock and try to acquire + Lock lock = registry.obtain(lockKey); + lock.tryLock(LOCK_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS); + lock.unlock(); + + lock2.set(lock); + } + catch (InterruptedException ignore) { + } + }); + + thread1.start(); + thread2.start(); + latch.countDown(); + thread1.join(); + thread2.join(); + + // locks must be the same! + assertThat(lock1.get()).isEqualTo(lock2.get()); + } + registry.destroy(); }