LeadershipController using KubernetesHelper
This commit is contained in:
committed by
Ioannis Canellos
parent
23fc9893bb
commit
7f6627e490
@@ -16,13 +16,15 @@
|
||||
|
||||
package org.springframework.cloud.kubernetes.leader;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import io.fabric8.kubernetes.api.model.ConfigMap;
|
||||
import io.fabric8.kubernetes.client.KubernetesClient;
|
||||
import io.fabric8.kubernetes.client.KubernetesClientException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.integration.leader.Candidate;
|
||||
import org.springframework.integration.leader.event.LeaderEventPublisher;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:gytis@redhat.com">Gytis Trikleris</a>
|
||||
@@ -33,49 +35,110 @@ public class LeadershipController {
|
||||
|
||||
private final LeaderProperties leaderProperties;
|
||||
|
||||
private final KubernetesClient kubernetesClient;
|
||||
private final KubernetesHelper kubernetesHelper;
|
||||
|
||||
public LeadershipController(LeaderProperties leaderProperties, KubernetesClient kubernetesClient) {
|
||||
private final LeaderEventPublisher leaderEventPublisher;
|
||||
|
||||
public LeadershipController(LeaderProperties leaderProperties, KubernetesHelper kubernetesHelper,
|
||||
LeaderEventPublisher leaderEventPublisher) {
|
||||
this.leaderProperties = leaderProperties;
|
||||
this.kubernetesClient = kubernetesClient;
|
||||
this.kubernetesHelper = kubernetesHelper;
|
||||
this.leaderEventPublisher = leaderEventPublisher;
|
||||
}
|
||||
|
||||
public boolean acquire(Candidate candidate) {
|
||||
try {
|
||||
ConfigMap configMap = kubernetesHelper.getConfigMap();
|
||||
if (configMap == null) {
|
||||
createLeaderConfigMap(candidate);
|
||||
handleOnGranted(candidate);
|
||||
return true;
|
||||
}
|
||||
|
||||
Leader leader = getLeader(candidate.getRole(), configMap);
|
||||
if (leader == null || !leader.isValid()) {
|
||||
updateLeaderConfigMap(candidate, configMap);
|
||||
handleOnGranted(candidate);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (candidate.getId().equals(leader.getId())) {
|
||||
return true;
|
||||
}
|
||||
} catch (KubernetesClientException e) {
|
||||
LOGGER.warn("Failed to acquire leadership with role='{}' for candidate='{}': {}", candidate.getRole(),
|
||||
candidate.getId(), e.getMessage());
|
||||
}
|
||||
|
||||
handleOnFailed(candidate);
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean revoke(Candidate candidate) {
|
||||
// Get config map
|
||||
// Check if candidate is a leader - if not, return false
|
||||
// Try to revoke leadership - return true
|
||||
// Return false
|
||||
// In case of an exception - pass it forward
|
||||
return false;
|
||||
}
|
||||
|
||||
public Leader getLeader(String role) {
|
||||
ConfigMap configMap = getConfigMap();
|
||||
if (configMap == null || configMap.getData() == null) {
|
||||
try {
|
||||
ConfigMap configMap = kubernetesHelper.getConfigMap();
|
||||
if (configMap == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return getLeader(role, configMap);
|
||||
} catch (KubernetesClientException e) {
|
||||
LOGGER.warn("Failed to get leader for role='{}': {}", role, e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private Leader getLeader(String role, ConfigMap configMap) {
|
||||
if (configMap.getData() == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<String, String> data = configMap.getData();
|
||||
String leaderIdKey = leaderProperties.getLeaderIdPrefix() + role;
|
||||
String leaderId = data.get(leaderIdKey);
|
||||
String leaderKey = leaderProperties.getLeaderIdPrefix() + role;
|
||||
String leaderId = data.get(leaderKey);
|
||||
if (leaderId == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new Leader(role, leaderId);
|
||||
return new Leader(role, leaderId, kubernetesHelper);
|
||||
}
|
||||
|
||||
private ConfigMap getConfigMap() {
|
||||
String namespace = leaderProperties.getNamespace(kubernetesClient.getNamespace());
|
||||
String name = leaderProperties.getConfigMapName();
|
||||
private void createLeaderConfigMap(Candidate candidate) {
|
||||
Map<String, String> data = getLeaderData(candidate);
|
||||
kubernetesHelper.createConfigMap(data);
|
||||
}
|
||||
|
||||
private void updateLeaderConfigMap(Candidate candidate, ConfigMap configMap) {
|
||||
Map<String, String> data = getLeaderData(candidate);
|
||||
kubernetesHelper.updateConfigMap(configMap, data);
|
||||
}
|
||||
|
||||
private void handleOnGranted(Candidate candidate) {
|
||||
leaderEventPublisher.publishOnGranted(this, null, candidate.getRole());
|
||||
try {
|
||||
return kubernetesClient.configMaps()
|
||||
.inNamespace(namespace)
|
||||
.withName(name)
|
||||
.get();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Failed to get ConfigMap '{}' in the namespace '{}': {}", namespace, name, e.getMessage());
|
||||
return null;
|
||||
candidate.onGranted(null); // TODO context
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn(e.getMessage());
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private void handleOnFailed(Candidate candidate) {
|
||||
leaderEventPublisher.publishOnFailedToAcquire(this, null, candidate.getRole()); // TODO context
|
||||
}
|
||||
|
||||
private Map<String, String> getLeaderData(Candidate candidate) {
|
||||
String leaderKey = leaderProperties.getLeaderIdPrefix() + candidate.getRole();
|
||||
return Collections.singletonMap(leaderKey, candidate.getId());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,22 +1,24 @@
|
||||
package org.springframework.cloud.kubernetes.leader;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import io.fabric8.kubernetes.api.model.ConfigMap;
|
||||
import io.fabric8.kubernetes.api.model.ConfigMapList;
|
||||
import io.fabric8.kubernetes.api.model.DoneableConfigMap;
|
||||
import io.fabric8.kubernetes.client.KubernetesClient;
|
||||
import io.fabric8.kubernetes.client.dsl.MixedOperation;
|
||||
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
|
||||
import io.fabric8.kubernetes.client.dsl.Resource;
|
||||
import io.fabric8.kubernetes.client.KubernetesClientException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.springframework.integration.leader.Candidate;
|
||||
import org.springframework.integration.leader.event.LeaderEventPublisher;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:gytis@redhat.com">Gytis Trikleris</a>
|
||||
@@ -24,9 +26,7 @@ import static org.mockito.BDDMockito.given;
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class LeadershipControllerTest {
|
||||
|
||||
private final String NAMESPACE = "test-namespace";
|
||||
|
||||
private final String CONFIG_MAP_NAME = "test-config-map-name";
|
||||
private final String PREFIX = "leader.";
|
||||
|
||||
private final String ROLE = "test-role";
|
||||
|
||||
@@ -36,52 +36,121 @@ public class LeadershipControllerTest {
|
||||
private LeaderProperties mockLeaderProperties;
|
||||
|
||||
@Mock
|
||||
private KubernetesClient mockKubernetesClient;
|
||||
|
||||
@Mock
|
||||
private MixedOperation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> configMapsOperation;
|
||||
|
||||
@Mock
|
||||
private NonNamespaceOperation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> inNamespaceOperation;
|
||||
|
||||
@Mock
|
||||
private Resource<ConfigMap, DoneableConfigMap> configMapResource;
|
||||
private KubernetesHelper mockKubernetesHelper;
|
||||
|
||||
@Mock
|
||||
private ConfigMap mockConfigMap;
|
||||
|
||||
@Mock
|
||||
private Map<String, String> mockData;
|
||||
private Candidate mockCandidate;
|
||||
|
||||
@Mock
|
||||
private LeaderEventPublisher mockLeaderEventPublisher;
|
||||
|
||||
private Map<String, String> leaderData = Collections.singletonMap(PREFIX + ROLE, ID);
|
||||
|
||||
private LeadershipController leadershipController;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
given(mockLeaderProperties.getLeaderIdPrefix()).willReturn("");
|
||||
given(mockLeaderProperties.getNamespace(NAMESPACE)).willReturn(NAMESPACE);
|
||||
given(mockLeaderProperties.getConfigMapName()).willReturn(CONFIG_MAP_NAME);
|
||||
given(mockLeaderProperties.getLeaderIdPrefix()).willReturn(PREFIX);
|
||||
given(mockCandidate.getId()).willReturn(ID);
|
||||
given(mockCandidate.getRole()).willReturn(ROLE);
|
||||
|
||||
given(mockKubernetesClient.getNamespace()).willReturn(NAMESPACE);
|
||||
given(mockKubernetesClient.configMaps()).willReturn(configMapsOperation);
|
||||
given(configMapsOperation.inNamespace(NAMESPACE)).willReturn(inNamespaceOperation);
|
||||
given(inNamespaceOperation.withName(CONFIG_MAP_NAME)).willReturn(configMapResource);
|
||||
given(configMapResource.get()).willReturn(mockConfigMap);
|
||||
given(mockConfigMap.getData()).willReturn(mockData);
|
||||
given(mockData.get(ROLE)).willReturn(ID);
|
||||
leadershipController =
|
||||
new LeadershipController(mockLeaderProperties, mockKubernetesHelper, mockLeaderEventPublisher);
|
||||
}
|
||||
|
||||
leadershipController = new LeadershipController(mockLeaderProperties, mockKubernetesClient);
|
||||
@Test
|
||||
public void shouldAcquireWithoutExistingConfigMap() {
|
||||
boolean result = leadershipController.acquire(mockCandidate);
|
||||
|
||||
assertThat(result).isTrue();
|
||||
verify(mockKubernetesHelper).createConfigMap(Collections.singletonMap(PREFIX + ROLE, ID));
|
||||
verify(mockLeaderEventPublisher).publishOnGranted(leadershipController, null, ROLE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldAcquireWithExistingConfigMap() {
|
||||
given(mockKubernetesHelper.getConfigMap()).willReturn(mockConfigMap);
|
||||
|
||||
boolean result = leadershipController.acquire(mockCandidate);
|
||||
|
||||
assertThat(result).isTrue();
|
||||
verify(mockKubernetesHelper).updateConfigMap(mockConfigMap, leaderData);
|
||||
verify(mockLeaderEventPublisher).publishOnGranted(leadershipController, null, ROLE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotAcquireIfAlreadyLeader() {
|
||||
given(mockKubernetesHelper.getConfigMap()).willReturn(mockConfigMap);
|
||||
given(mockKubernetesHelper.isPodAlive(ID)).willReturn(true);
|
||||
given(mockConfigMap.getData()).willReturn(leaderData);
|
||||
|
||||
boolean result = leadershipController.acquire(mockCandidate);
|
||||
|
||||
assertThat(result).isTrue();
|
||||
verify(mockKubernetesHelper, times(0)).createConfigMap(any());
|
||||
verify(mockKubernetesHelper, times(0)).updateConfigMap(any(), any());
|
||||
verify(mockLeaderEventPublisher, times(0)).publishOnGranted(any(), any(), any());
|
||||
verify(mockLeaderEventPublisher, times(0)).publishOnRevoked(any(), any(), any());
|
||||
verify(mockLeaderEventPublisher, times(0)).publishOnFailedToAcquire(any(), any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldTakeOverLeadershipFromInvalidLeader() {
|
||||
String anotherId = "another-test-id";
|
||||
given(mockKubernetesHelper.getConfigMap()).willReturn(mockConfigMap);
|
||||
given(mockKubernetesHelper.isPodAlive(ID)).willReturn(false);
|
||||
given(mockConfigMap.getData()).willReturn(leaderData);
|
||||
given(mockCandidate.getId()).willReturn(anotherId);
|
||||
|
||||
boolean result = leadershipController.acquire(mockCandidate);
|
||||
|
||||
assertThat(result).isTrue();
|
||||
verify(mockKubernetesHelper).updateConfigMap(mockConfigMap, Collections.singletonMap(PREFIX + ROLE, anotherId));
|
||||
verify(mockLeaderEventPublisher).publishOnGranted(leadershipController, null, ROLE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldFailToAcquireBecauseOfExistingLeader() {
|
||||
given(mockKubernetesHelper.getConfigMap()).willReturn(mockConfigMap);
|
||||
given(mockKubernetesHelper.isPodAlive(ID)).willReturn(true);
|
||||
given(mockConfigMap.getData()).willReturn(leaderData);
|
||||
given(mockCandidate.getId()).willReturn("another-test-id");
|
||||
|
||||
boolean result = leadershipController.acquire(mockCandidate);
|
||||
|
||||
assertThat(result).isFalse();
|
||||
verify(mockKubernetesHelper, times(0)).createConfigMap(any());
|
||||
verify(mockKubernetesHelper, times(0)).updateConfigMap(any(), any());
|
||||
verify(mockLeaderEventPublisher).publishOnFailedToAcquire(leadershipController, null, ROLE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldFailToAcquireBecauseOfException() {
|
||||
doThrow(new KubernetesClientException("Test exception")).when(mockKubernetesHelper).createConfigMap(any());
|
||||
|
||||
boolean result = leadershipController.acquire(mockCandidate);
|
||||
|
||||
assertThat(result).isFalse();
|
||||
verify(mockLeaderEventPublisher).publishOnFailedToAcquire(leadershipController, null, ROLE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetLeader() {
|
||||
given(mockKubernetesHelper.getConfigMap()).willReturn(mockConfigMap);
|
||||
given(mockConfigMap.getData()).willReturn(leaderData);
|
||||
|
||||
Leader leader = leadershipController.getLeader(ROLE);
|
||||
|
||||
assertThat(leader.getRole()).isEqualTo(ROLE);
|
||||
assertThat(leader.getId()).isEqualTo(ID);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotGetLeaderFromNonExistingConfigMap() {
|
||||
given(mockLeaderProperties.getConfigMapName()).willReturn("unknown");
|
||||
given(mockKubernetesHelper.getConfigMap()).willReturn(null);
|
||||
|
||||
Leader leader = leadershipController.getLeader(ROLE);
|
||||
assertThat(leader).isNull();
|
||||
@@ -89,6 +158,7 @@ public class LeadershipControllerTest {
|
||||
|
||||
@Test
|
||||
public void shouldNotGetLeaderFromEmptyConfigMap() {
|
||||
given(mockKubernetesHelper.getConfigMap()).willReturn(mockConfigMap);
|
||||
given(mockConfigMap.getData()).willReturn(null);
|
||||
|
||||
Leader leader = leadershipController.getLeader(ROLE);
|
||||
@@ -97,7 +167,8 @@ public class LeadershipControllerTest {
|
||||
|
||||
@Test
|
||||
public void shouldNotGetLeaderFromInvalidConfigMap() {
|
||||
given(mockData.get(ROLE)).willReturn(null);
|
||||
given(mockKubernetesHelper.getConfigMap()).willReturn(mockConfigMap);
|
||||
given(mockConfigMap.getData()).willReturn(Collections.emptyMap());
|
||||
|
||||
Leader leader = leadershipController.getLeader(ROLE);
|
||||
assertThat(leader).isNull();
|
||||
@@ -105,7 +176,7 @@ public class LeadershipControllerTest {
|
||||
|
||||
@Test
|
||||
public void shouldHandleFailureWhenGettingLeader() {
|
||||
given(mockKubernetesClient.configMaps()).willThrow(new RuntimeException("Test exception"));
|
||||
given(mockKubernetesHelper.getConfigMap()).willThrow(new KubernetesClientException("Test exception"));
|
||||
|
||||
Leader leader = leadershipController.getLeader(ROLE);
|
||||
assertThat(leader).isNull();
|
||||
|
||||
Reference in New Issue
Block a user