Refactor lock to leader

This commit is contained in:
Gytis Trikleris
2018-06-01 11:57:21 +02:00
committed by Ioannis Canellos
parent 289d4e90e3
commit 5a7a2c4e61
12 changed files with 3 additions and 733 deletions

View File

@@ -87,7 +87,7 @@
<module>spring-cloud-starter-kubernetes-zipkin</module>
<module>spring-cloud-starter-kubernetes-all</module>
<module>spring-cloud-kubernetes-examples</module>
<module>spring-cloud-kubernetes-lock</module>
<module>spring-cloud-kubernetes-leader</module>
</modules>
<dependencyManagement>

View File

@@ -24,8 +24,8 @@
<version>0.3.0.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-kubernetes-lock</artifactId>
<name>Spring Cloud Kubernetes :: Lock</name>
<artifactId>spring-cloud-kubernetes-leader</artifactId>
<name>Spring Cloud Kubernetes :: Leader</name>
<dependencies>
<dependency>

View File

@@ -1,113 +0,0 @@
/*
* Copyright (C) 2018 to the original authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.kubernetes.lock;
import java.util.Optional;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConfigMapLockRepository {
static final String CONFIG_MAP_PREFIX = "lock";
static final String HOLDER_KEY = "holder";
static final String CREATED_AT_KEY = "created_at";
static final String PROVIDER_LABEL = "provider";
static final String PROVIDER_LABEL_VALUE = "spring-cloud-kubernetes";
static final String KIND_LABEL = "kind";
static final String KIND_LABEL_VALUE = "lock";
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigMapLockRepository.class);
private KubernetesClient kubernetesClient;
private String namespace;
public ConfigMapLockRepository(KubernetesClient kubernetesClient, String namespace) {
this.kubernetesClient = kubernetesClient;
this.namespace = namespace;
}
public Optional<ConfigMap> get(String name) {
String configMapName = getConfigMapName(name);
ConfigMap configMap = kubernetesClient.configMaps()
.inNamespace(namespace)
.withName(configMapName)
.get();
return Optional.ofNullable(configMap);
}
public boolean create(String name, String holder, long createdAt) {
String configMapName = getConfigMapName(name);
String createdAtString = String.valueOf(createdAt);
ConfigMap configMap = new ConfigMapBuilder().withNewMetadata()
.withName(configMapName)
.addToLabels(PROVIDER_LABEL, PROVIDER_LABEL_VALUE)
.addToLabels(KIND_LABEL, KIND_LABEL_VALUE)
.endMetadata()
.addToData(HOLDER_KEY, holder)
.addToData(CREATED_AT_KEY, createdAtString)
.build();
try {
kubernetesClient.configMaps()
.inNamespace(namespace)
.create(configMap);
} catch (KubernetesClientException e) {
LOGGER.warn("Failed to create ConfigMap for name '{}': ", name, e.getMessage());
return false;
}
return true;
}
public void delete(String name) {
// TODO make sure that only creator can delete the lock
kubernetesClient.configMaps()
.inNamespace(namespace)
.withName(getConfigMapName(name))
.delete();
}
public void deleteIfOlderThan(String name, long age) {
get(name)
.filter(m -> this.isOlder(m, age))
// TODO what if someone else deletes and creates a lock in this gap?
.ifPresent(c -> delete(name));
}
private boolean isOlder(ConfigMap configMap, long age) {
String createdAtString = configMap.getData().get(CREATED_AT_KEY);
return System.currentTimeMillis() - Long.valueOf(createdAtString) > age;
}
private String getConfigMapName(String name) {
return String.format("%s-%s", CONFIG_MAP_PREFIX, name);
}
}

View File

@@ -1,97 +0,0 @@
/*
* Copyright (C) 2018 to the original authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.kubernetes.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class KubernetesLock implements Lock {
static final long DEFAULT_TTL = 10000;
private static final int LOCK_RETRY_INTERVAL = 100;
private final ConfigMapLockRepository repository;
private final String name;
private final String holder;
private final long createdAt;
public KubernetesLock(ConfigMapLockRepository repository, String name, String holder, long createdAt) {
this.repository = repository;
this.name = name;
this.holder = holder;
this.createdAt = createdAt;
}
@Override
public void lock() {
while (true) {
try {
if (tryLock()) {
return;
}
Thread.sleep(LOCK_RETRY_INTERVAL);
} catch (InterruptedException e) {
// This method cannot be interrupted
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
while (true) {
if (tryLock()) {
return;
}
Thread.sleep(LOCK_RETRY_INTERVAL);
}
}
@Override
public boolean tryLock() {
repository.deleteIfOlderThan(name, DEFAULT_TTL);
return repository.create(name, holder, createdAt);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long expiration = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(time, unit);
while (true) {
if (System.currentTimeMillis() > expiration) {
return false;
} else if (tryLock()) {
return true;
}
Thread.sleep(LOCK_RETRY_INTERVAL);
}
}
@Override
public void unlock() {
repository.delete(name);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("Condition is not supported");
}
}

View File

@@ -1,53 +0,0 @@
/*
* Copyright (C) 2018 to the original authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.kubernetes.lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import org.springframework.integration.support.locks.ExpirableLockRegistry;
import org.springframework.util.Assert;
public class KubernetesLockRegistry implements ExpirableLockRegistry {
private final ConfigMapLockRepository repository;
private final String id;
private final Map<String, KubernetesLock> locks;
public KubernetesLockRegistry(ConfigMapLockRepository repository, String id) {
this.repository = repository;
this.id = id;
this.locks = new HashMap<>();
}
@Override
public Lock obtain(Object key) {
Assert.isInstanceOf(String.class, key);
String name = (String) key;
return locks.computeIfAbsent(name, n -> new KubernetesLock(repository, n, id, System.currentTimeMillis()));
}
@Override
public void expireUnusedOlderThan(long age) {
locks.forEach((n, l) -> repository.deleteIfOlderThan(n, age));
}
}

View File

@@ -1,87 +0,0 @@
package org.springframework.cloud.kubernetes.lock;
import java.util.Map;
import java.util.Optional;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.arquillian.cube.kubernetes.api.Session;
import org.arquillian.cube.kubernetes.impl.requirement.RequiresKubernetes;
import org.arquillian.cube.requirement.ArquillianConditionalRunner;
import org.jboss.arquillian.test.api.ArquillianResource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.cloud.kubernetes.lock.ConfigMapLockRepository.CREATED_AT_KEY;
import static org.springframework.cloud.kubernetes.lock.ConfigMapLockRepository.HOLDER_KEY;
import static org.springframework.cloud.kubernetes.lock.KubernetesLock.DEFAULT_TTL;
@RunWith(ArquillianConditionalRunner.class)
@RequiresKubernetes
public class ConfigMapLockRepositoryIT {
private static final String NAME = "test-name";
private static final String HOLDER = "test-holder";
@ArquillianResource
private KubernetesClient kubernetesClient;
@ArquillianResource
private Session session;
private ConfigMapLockRepository repository;
@Before
public void before() {
repository = new ConfigMapLockRepository(kubernetesClient, session.getNamespace());
}
@After
public void after() {
repository.delete(NAME);
}
@Test
public void shouldCreate() {
assertThat(repository.create(NAME, HOLDER, 1000)).isTrue();
Optional<ConfigMap> optionalConfigMap = repository.get(NAME);
assertThat(optionalConfigMap.isPresent()).isTrue();
Map<String, String> data = optionalConfigMap.get().getData();
assertThat(data).containsEntry(HOLDER_KEY, HOLDER);
assertThat(data).containsEntry(CREATED_AT_KEY, String.valueOf(1000));
}
@Test
public void shouldNotOverwrite() {
assertThat(repository.create(NAME, HOLDER, 0)).isTrue();
assertThat(repository.create(NAME, HOLDER, 0)).isFalse();
}
@Test
public void shouldDelete() {
repository.create(NAME, HOLDER, 0);
repository.delete(NAME);
assertThat(repository.get(NAME).isPresent()).isFalse();
}
@Test
public void shouldDeleteExpired() {
repository.create(NAME, HOLDER, 0);
repository.deleteIfOlderThan(NAME, DEFAULT_TTL);
assertThat(repository.get(NAME).isPresent()).isFalse();
}
@Test
public void shouldKeepNotExpired() {
repository.create(NAME, HOLDER, System.currentTimeMillis());
repository.deleteIfOlderThan(NAME, DEFAULT_TTL);
assertThat(repository.get(NAME).isPresent()).isTrue();
}
}

View File

@@ -1,147 +0,0 @@
package org.springframework.cloud.kubernetes.lock;
import java.util.Map;
import java.util.Optional;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.DoneableConfigMap;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.springframework.cloud.kubernetes.lock.ConfigMapLockRepository.CONFIG_MAP_PREFIX;
import static org.springframework.cloud.kubernetes.lock.ConfigMapLockRepository.CREATED_AT_KEY;
import static org.springframework.cloud.kubernetes.lock.ConfigMapLockRepository.HOLDER_KEY;
import static org.springframework.cloud.kubernetes.lock.ConfigMapLockRepository.KIND_LABEL;
import static org.springframework.cloud.kubernetes.lock.ConfigMapLockRepository.KIND_LABEL_VALUE;
import static org.springframework.cloud.kubernetes.lock.ConfigMapLockRepository.PROVIDER_LABEL;
import static org.springframework.cloud.kubernetes.lock.ConfigMapLockRepository.PROVIDER_LABEL_VALUE;
@RunWith(MockitoJUnitRunner.class)
public class ConfigMapLockRepositoryTest {
private static final String NAMESPACE = "test-namespace";
private static final String NAME = "test-name";
private static final String CONFIGMAP_NAME = String.format("%s-%s", CONFIG_MAP_PREFIX, NAME);
private static final String HOLDER = "test-holder";
@Mock
private KubernetesClient mockKubernetesClient;
@Mock
private MixedOperation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>>
mockConfigMapsOperation;
@Mock
private NonNamespaceOperation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>>
mockInNamespaceOperation;
@Mock
private Resource<ConfigMap, DoneableConfigMap> mockWithNameResource;
@Mock
private ConfigMap mockConfigMap;
@Mock
private Map<String, String> mockData;
private ConfigMapLockRepository repository;
@Before
public void before() {
given(mockKubernetesClient.configMaps()).willReturn(mockConfigMapsOperation);
given(mockConfigMapsOperation.inNamespace(NAMESPACE)).willReturn(mockInNamespaceOperation);
given(mockInNamespaceOperation.withName(CONFIGMAP_NAME)).willReturn(mockWithNameResource);
given(mockWithNameResource.get()).willReturn(mockConfigMap);
given(mockConfigMap.getData()).willReturn(mockData);
repository = new ConfigMapLockRepository(mockKubernetesClient, NAMESPACE);
}
@Test
public void shouldGet() {
Optional<ConfigMap> optionalConfigMap = repository.get(NAME);
assertThat(optionalConfigMap.isPresent()).isTrue();
assertThat(optionalConfigMap.get()).isEqualTo(mockConfigMap);
}
@Test
public void shouldNotGetNonExistent() {
given(mockWithNameResource.get()).willReturn(null);
Optional<ConfigMap> optionalConfigMap = repository.get(NAME);
assertThat(optionalConfigMap.isPresent()).isFalse();
}
@Test
public void shouldCreate() {
boolean result = repository.create(NAME, HOLDER, 1000);
assertThat(result).isTrue();
verify(mockKubernetesClient).configMaps();
verify(mockConfigMapsOperation).inNamespace(NAMESPACE);
ConfigMap expectedConfigMap = new ConfigMapBuilder().withNewMetadata()
.withName(CONFIGMAP_NAME)
.addToLabels(PROVIDER_LABEL, PROVIDER_LABEL_VALUE)
.addToLabels(KIND_LABEL, KIND_LABEL_VALUE)
.endMetadata()
.addToData(HOLDER_KEY, HOLDER)
.addToData(CREATED_AT_KEY, "1000")
.build();
verify(mockInNamespaceOperation).create(eq(expectedConfigMap));
}
@Test
public void shouldHandleCreationFailure() {
given(mockKubernetesClient.configMaps()).willThrow(new KubernetesClientException("test exception"));
boolean result = repository.create(NAME, HOLDER, 1000);
assertThat(result).isFalse();
}
@Test
public void shouldDelete() {
repository.delete(NAME);
verify(mockWithNameResource).delete();
}
@Test
public void shouldDeleteOld() {
given(mockData.get(CREATED_AT_KEY)).willReturn(String.valueOf(System.currentTimeMillis() - 1000));
repository.deleteIfOlderThan(NAME, 100);
verify(mockWithNameResource).delete();
}
@Test
public void shouldNotDeleteNonExpired() {
given(mockData.get(CREATED_AT_KEY)).willReturn(String.valueOf(System.currentTimeMillis()) + 1000);
repository.deleteIfOlderThan(NAME, 100);
verify(mockWithNameResource, times(0)).delete();
}
}

View File

@@ -1,50 +0,0 @@
package org.springframework.cloud.kubernetes.lock;
import java.util.concurrent.locks.Lock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class KubernetesLockRegistryTest {
@Mock
private ConfigMapLockRepository mockRepository;
private KubernetesLockRegistry registry;
@Before
public void before() {
registry = new KubernetesLockRegistry(mockRepository, "test-id");
}
@Test
public void shouldObtain() {
Lock actualLock = registry.obtain("test-name");
KubernetesLock expectedLock = new KubernetesLock(mockRepository, "test-name", "test-id", System.currentTimeMillis());
assertThat(actualLock).isEqualToComparingOnlyGivenFields(expectedLock, "name", "holder");
}
@Test(expected = IllegalArgumentException.class)
public void shouldFailToObtainWithNonStringKey() {
registry.obtain(new Object());
}
@Test
public void expireUnusedOlderThanShouldDelegate() {
registry.obtain("test-name-1");
registry.obtain("test-name-2");
registry.expireUnusedOlderThan(100);
verify(mockRepository).deleteIfOlderThan("test-name-1", 100);
verify(mockRepository).deleteIfOlderThan("test-name-2", 100);
}
}

View File

@@ -1,183 +0,0 @@
package org.springframework.cloud.kubernetes.lock;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.springframework.cloud.kubernetes.lock.KubernetesLock.DEFAULT_TTL;
@RunWith(MockitoJUnitRunner.class)
public class KubernetesLockTest {
private static final String NAME = "test-name";
private static final String HOLDER = "test-holder";
@Mock
private ConfigMapLockRepository repository;
private KubernetesLock lock;
@Before
public void before() {
lock = new KubernetesLock(repository, NAME, HOLDER, 0);
}
@Test
public void shouldLock() {
given(repository.create(NAME, HOLDER, 0)).willReturn(true);
lock.lock();
verify(repository).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository).create(NAME, HOLDER, 0);
}
@Test
public void lockShouldWaitUntilLockCanBeAcquired() {
given(repository.create(NAME, HOLDER, 0)).will(new LockInSecondCall());
lock.lock();
verify(repository, times(2)).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository, times(2)).create(NAME, HOLDER, 0);
}
@Test
public void lockShouldNotBeInterrupted() {
given(repository.create(NAME, HOLDER, 0)).will(new InterrupFirstCall());
lock.lock();
verify(repository, times(2)).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository, times(2)).create(NAME, HOLDER, 0);
}
@Test
public void shouldLockWithLockInterruptibly() throws InterruptedException {
given(repository.create(NAME, HOLDER, 0)).willReturn(true);
lock.lockInterruptibly();
verify(repository).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository).create(NAME, HOLDER, 0);
}
@Test
public void lockInterruptiblyShouldWaitUntilLockCanBeAcquired() throws InterruptedException {
given(repository.create(NAME, HOLDER, 0)).will(new LockInSecondCall());
lock.lockInterruptibly();
verify(repository, times(2)).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository, times(2)).create(NAME, HOLDER, 0);
}
@Test(expected = InterruptedException.class)
public void lockInterruptiblyShouldBeInterrupted() throws InterruptedException {
given(repository.create(NAME, HOLDER, 0)).will(new InterrupFirstCall());
lock.lockInterruptibly();
}
@Test
public void shouldLockWithTryLock() {
given(repository.create(NAME, HOLDER, 0)).willReturn(true);
assertThat(lock.tryLock()).isTrue();
verify(repository).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository).create(NAME, HOLDER, 0);
}
@Test
public void shouldFailToLockWithTryLock() {
given(repository.create(NAME, HOLDER, 0)).willReturn(false);
assertThat(lock.tryLock()).isFalse();
verify(repository).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository).create(NAME, HOLDER, 0);
}
@Test
public void shouldLockWithTryLockTimeout() throws InterruptedException {
given(repository.create(NAME, HOLDER, 0)).willReturn(true);
assertThat(lock.tryLock(2, TimeUnit.SECONDS)).isTrue();
verify(repository).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository).create(NAME, HOLDER, 0);
}
@Test
public void tryLockShouldWaitUntilLockIsAvailable() throws InterruptedException {
given(repository.create(NAME, HOLDER, 0)).will(new LockInSecondCall());
assertThat(lock.tryLock(2, TimeUnit.SECONDS)).isTrue();
verify(repository, times(2)).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository, times(2)).create(NAME, HOLDER, 0);
}
@Test
public void tryLockShouldTimeout() throws InterruptedException {
given(repository.create(NAME, HOLDER, 0)).will(new LockInSecondCall());
assertThat(lock.tryLock(90, TimeUnit.MILLISECONDS)).isFalse();
verify(repository).deleteIfOlderThan(NAME, DEFAULT_TTL);
verify(repository).create(NAME, HOLDER, 0);
}
@Test(expected = InterruptedException.class)
public void tryLockShouldBeInterrupted() throws InterruptedException {
given(repository.create(NAME, HOLDER, 0)).will(new InterrupFirstCall());
lock.tryLock(90, TimeUnit.MILLISECONDS);
}
@Test
public void shouldUnlock() {
lock.unlock();
verify(repository).delete(NAME);
}
@Test(expected = UnsupportedOperationException.class)
public void newConditionShouldFail() {
lock.newCondition();
}
private static class LockInSecondCall implements Answer<Boolean> {
private int callsCounter = 0;
@Override
public Boolean answer(InvocationOnMock invocationOnMock) {
return callsCounter++ > 0;
}
}
private static class InterrupFirstCall implements Answer<Boolean> {
private int callsCounter = 0;
@Override
public Boolean answer(InvocationOnMock invocationOnMock) throws InterruptedException {
if (callsCounter++ > 0) {
return true;
}
throw new InterruptedException("test exception");
}
}
}