Commit 8e5a041b authored by tomekl007's avatar tomekl007 Committed by Stephane Nicoll

Improve Cassandra health indicator with more robust mechanism

See gh-23041
parent f241cd04
...@@ -16,14 +16,17 @@ ...@@ -16,14 +16,17 @@
package org.springframework.boot.actuate.cassandra; package org.springframework.boot.actuate.cassandra;
import com.datastax.oss.driver.api.core.ConsistencyLevel; import java.util.Collection;
import java.util.Optional;
import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.metadata.NodeState;
import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator; import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
...@@ -31,13 +34,11 @@ import org.springframework.util.Assert; ...@@ -31,13 +34,11 @@ import org.springframework.util.Assert;
* Cassandra data stores. * Cassandra data stores.
* *
* @author Alexandre Dutra * @author Alexandre Dutra
* @author Tomasz Lelek
* @since 2.4.0 * @since 2.4.0
*/ */
public class CassandraDriverHealthIndicator extends AbstractHealthIndicator { public class CassandraDriverHealthIndicator extends AbstractHealthIndicator {
private static final SimpleStatement SELECT = SimpleStatement
.newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
private final CqlSession session; private final CqlSession session;
/** /**
...@@ -52,11 +53,10 @@ public class CassandraDriverHealthIndicator extends AbstractHealthIndicator { ...@@ -52,11 +53,10 @@ public class CassandraDriverHealthIndicator extends AbstractHealthIndicator {
@Override @Override
protected void doHealthCheck(Health.Builder builder) throws Exception { protected void doHealthCheck(Health.Builder builder) throws Exception {
Row row = this.session.execute(SELECT).one(); Collection<Node> nodes = this.session.getMetadata().getNodes().values();
builder.up(); Optional<Node> nodeUp = nodes.stream().filter((node) -> node.getState() == NodeState.UP).findAny();
if (row != null && !row.isNull(0)) { builder.status(nodeUp.isPresent() ? Status.UP : Status.DOWN);
builder.withDetail("version", row.getString(0)); nodeUp.map(Node::getCassandraVersion).ifPresent((version) -> builder.withDetail("version", version));
}
} }
} }
...@@ -15,14 +15,18 @@ ...@@ -15,14 +15,18 @@
*/ */
package org.springframework.boot.actuate.cassandra; package org.springframework.boot.actuate.cassandra;
import com.datastax.oss.driver.api.core.ConsistencyLevel; import java.util.Collection;
import java.util.Optional;
import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.ReactiveHealthIndicator; import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
...@@ -30,13 +34,11 @@ import org.springframework.util.Assert; ...@@ -30,13 +34,11 @@ import org.springframework.util.Assert;
* for Cassandra data stores. * for Cassandra data stores.
* *
* @author Alexandre Dutra * @author Alexandre Dutra
* @author Tomasz Lelek
* @since 2.4.0 * @since 2.4.0
*/ */
public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHealthIndicator { public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
private static final SimpleStatement SELECT = SimpleStatement
.newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
private final CqlSession session; private final CqlSession session;
/** /**
...@@ -51,8 +53,13 @@ public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHeal ...@@ -51,8 +53,13 @@ public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHeal
@Override @Override
protected Mono<Health> doHealthCheck(Health.Builder builder) { protected Mono<Health> doHealthCheck(Health.Builder builder) {
return Mono.from(this.session.executeReactive(SELECT)) return Mono.fromSupplier(() -> {
.map((row) -> builder.up().withDetail("version", row.getString(0)).build()); Collection<Node> nodes = this.session.getMetadata().getNodes().values();
Optional<Node> nodeUp = nodes.stream().filter((node) -> node.getState() == NodeState.UP).findAny();
builder.status(nodeUp.isPresent() ? Status.UP : Status.DOWN);
nodeUp.map(Node::getCassandraVersion).ifPresent((version) -> builder.withDetail("version", version));
return builder.build();
});
} }
} }
...@@ -16,11 +16,16 @@ ...@@ -16,11 +16,16 @@
package org.springframework.boot.actuate.cassandra; package org.springframework.boot.actuate.cassandra;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverTimeoutException; import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.Version;
import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
...@@ -28,14 +33,15 @@ import org.springframework.boot.actuate.health.Status; ...@@ -28,14 +33,15 @@ import org.springframework.boot.actuate.health.Status;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock; import static org.mockito.BDDMockito.mock;
import static org.mockito.BDDMockito.when;
/** /**
* Tests for {@link CassandraDriverHealthIndicator}. * Tests for {@link CassandraDriverHealthIndicator}.
* *
* @author Alexandre Dutra * @author Alexandre Dutra
* @author Tomasz Lelek
* @since 2.4.0 * @since 2.4.0
*/ */
class CassandraDriverHealthIndicatorTests { class CassandraDriverHealthIndicatorTests {
...@@ -46,24 +52,137 @@ class CassandraDriverHealthIndicatorTests { ...@@ -46,24 +52,137 @@ class CassandraDriverHealthIndicatorTests {
} }
@Test @Test
void healthWithCassandraUp() { void oneHealthyNodeShouldReturnUp() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node healthyNode = mock(Node.class);
given(healthyNode.getState()).willReturn(NodeState.UP);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(healthyNode));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
}
@Test
void oneUnhealthyNodeShouldReturnDown() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node unhealthyNode = mock(Node.class);
given(unhealthyNode.getState()).willReturn(NodeState.DOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(unhealthyNode));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
@Test
void oneUnknownNodeShouldReturnDown() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node unknownNode = mock(Node.class);
given(unknownNode.getState()).willReturn(NodeState.UNKNOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(unknownNode));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
@Test
void oneForcedDownNodeShouldReturnDown() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node forcedDownNode = mock(Node.class);
given(forcedDownNode.getState()).willReturn(NodeState.FORCED_DOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(forcedDownNode));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
@Test
void oneHealthyNodeAndOneUnhealthyNodeShouldReturnUp() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node healthyNode = mock(Node.class);
Node unhealthyNode = mock(Node.class);
given(healthyNode.getState()).willReturn(NodeState.UP);
given(unhealthyNode.getState()).willReturn(NodeState.DOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(healthyNode, unhealthyNode));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
}
@Test
void oneHealthyNodeAndOneUnknownNodeShouldReturnUp() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node healthyNode = mock(Node.class);
Node unknownNode = mock(Node.class);
given(healthyNode.getState()).willReturn(NodeState.UP);
given(unknownNode.getState()).willReturn(NodeState.UNKNOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(healthyNode, unknownNode));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
}
@Test
void oneHealthyNodeAndOneForcedDownNodeShouldReturnUp() {
CqlSession session = mock(CqlSession.class); CqlSession session = mock(CqlSession.class);
ResultSet resultSet = mock(ResultSet.class); Metadata metadata = mock(Metadata.class);
Row row = mock(Row.class); Node healthyNode = mock(Node.class);
given(session.execute(any(SimpleStatement.class))).willReturn(resultSet); Node forcedDownNode = mock(Node.class);
given(resultSet.one()).willReturn(row); given(healthyNode.getState()).willReturn(NodeState.UP);
given(row.isNull(0)).willReturn(false); given(forcedDownNode.getState()).willReturn(NodeState.FORCED_DOWN);
given(row.getString(0)).willReturn("1.0.0"); given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(healthyNode, forcedDownNode));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health(); Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP); assertThat(health.getStatus()).isEqualTo(Status.UP);
assertThat(health.getDetails().get("version")).isEqualTo("1.0.0"); }
@Test
void addVersionToDetailsIfReportedNotNull() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
when(session.getMetadata()).thenReturn(metadata);
Node node = mock(Node.class);
when(node.getState()).thenReturn(NodeState.UP);
when(node.getCassandraVersion()).thenReturn(Version.V4_0_0);
when(metadata.getNodes()).thenReturn(createNodesMap(node));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
assertThat(health.getDetails().get("version")).isEqualTo(Version.V4_0_0);
}
@Test
void doNotAddVersionToDetailsIfReportedNull() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
when(session.getMetadata()).thenReturn(metadata);
Node node = mock(Node.class);
when(node.getState()).thenReturn(NodeState.UP);
when(metadata.getNodes()).thenReturn(createNodesMap(node));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
assertThat(health.getDetails().get("version")).isNull();
} }
@Test @Test
void healthWithCassandraDown() { void healthWithCassandraDown() {
CqlSession session = mock(CqlSession.class); CqlSession session = mock(CqlSession.class);
given(session.execute(any(SimpleStatement.class))).willThrow(new DriverTimeoutException("Test Exception")); given(session.getMetadata()).willThrow(new DriverTimeoutException("Test Exception"));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health(); Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN); assertThat(health.getStatus()).isEqualTo(Status.DOWN);
...@@ -71,4 +190,12 @@ class CassandraDriverHealthIndicatorTests { ...@@ -71,4 +190,12 @@ class CassandraDriverHealthIndicatorTests {
.isEqualTo(DriverTimeoutException.class.getName() + ": Test Exception"); .isEqualTo(DriverTimeoutException.class.getName() + ": Test Exception");
} }
private static Map<UUID, Node> createNodesMap(Node... nodes) {
Map<UUID, Node> nodesMap = new HashMap<>();
for (Node n : nodes) {
nodesMap.put(UUID.randomUUID(), n);
}
return nodesMap;
}
} }
...@@ -15,15 +15,17 @@ ...@@ -15,15 +15,17 @@
*/ */
package org.springframework.boot.actuate.cassandra; package org.springframework.boot.actuate.cassandra;
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet; import java.util.HashMap;
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow; import java.util.Map;
import java.util.UUID;
import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverTimeoutException; import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.Version;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
...@@ -32,15 +34,16 @@ import org.springframework.boot.actuate.health.Status; ...@@ -32,15 +34,16 @@ import org.springframework.boot.actuate.health.Status;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.BDDMockito.when;
/** /**
* Tests for {@link CassandraDriverReactiveHealthIndicator}. * Tests for {@link CassandraDriverReactiveHealthIndicator}.
* *
* @author Alexandre Dutra * @author Alexandre Dutra
* @author Tomasz Lelek
* @since 2.4.0 * @since 2.4.0
*/ */
class CassandraDriverReactiveHealthIndicatorTests { class CassandraDriverReactiveHealthIndicatorTests {
...@@ -51,28 +54,150 @@ class CassandraDriverReactiveHealthIndicatorTests { ...@@ -51,28 +54,150 @@ class CassandraDriverReactiveHealthIndicatorTests {
} }
@Test @Test
void testCassandraIsUp() { void oneHealthyNodeShouldReturnUp() {
CqlSession session = mock(CqlSession.class); CqlSession session = mock(CqlSession.class);
ReactiveResultSet results = mock(ReactiveResultSet.class); Metadata metadata = mock(Metadata.class);
ReactiveRow row = mock(ReactiveRow.class); Node healthyNode = mock(Node.class);
given(session.executeReactive(any(SimpleStatement.class))).willReturn(results); given(healthyNode.getState()).willReturn(NodeState.UP);
willAnswer(mockReactiveResultSetBehavior(row)).given(results).subscribe(any()); given(session.getMetadata()).willReturn(metadata);
given(row.getString(0)).willReturn("6.0.0"); given(metadata.getNodes()).willReturn(createNodesMap(healthyNode));
CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator( CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
session); Mono<Health> health = healthIndicator.health();
Mono<Health> health = cassandraReactiveHealthIndicator.health(); StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP))
.verifyComplete();
}
@Test
void oneUnhealthyNodeShouldReturnDown() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node unhealthyNode = mock(Node.class);
given(unhealthyNode.getState()).willReturn(NodeState.DOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(unhealthyNode));
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN))
.verifyComplete();
}
@Test
void oneUnknownNodeShouldReturnDown() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node unknownNode = mock(Node.class);
given(unknownNode.getState()).willReturn(NodeState.UNKNOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(unknownNode));
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN))
.verifyComplete();
}
@Test
void oneForcedDownNodeShouldReturnDown() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node forcedDownNode = mock(Node.class);
given(forcedDownNode.getState()).willReturn(NodeState.FORCED_DOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(forcedDownNode));
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN))
.verifyComplete();
}
@Test
void oneHealthyNodeAndOneUnhealthyNodeShouldReturnUp() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node healthyNode = mock(Node.class);
Node unhealthyNode = mock(Node.class);
given(healthyNode.getState()).willReturn(NodeState.UP);
given(unhealthyNode.getState()).willReturn(NodeState.DOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(healthyNode, unhealthyNode));
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP))
.verifyComplete();
}
@Test
void oneHealthyNodeAndOneUnknownNodeShouldReturnUp() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node healthyNode = mock(Node.class);
Node unknownNode = mock(Node.class);
given(healthyNode.getState()).willReturn(NodeState.UP);
given(unknownNode.getState()).willReturn(NodeState.UNKNOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(healthyNode, unknownNode));
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP))
.verifyComplete();
}
@Test
void oneHealthyNodeAndOneForcedDownNodeShouldReturnUp() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
Node healthyNode = mock(Node.class);
Node forcedDownNode = mock(Node.class);
given(healthyNode.getState()).willReturn(NodeState.UP);
given(forcedDownNode.getState()).willReturn(NodeState.FORCED_DOWN);
given(session.getMetadata()).willReturn(metadata);
given(metadata.getNodes()).willReturn(createNodesMap(healthyNode, forcedDownNode));
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.UP))
.verifyComplete();
}
@Test
void addVersionToDetailsIfReportedNotNull() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
when(session.getMetadata()).thenReturn(metadata);
Node node = mock(Node.class);
when(node.getState()).thenReturn(NodeState.UP);
when(node.getCassandraVersion()).thenReturn(Version.V4_0_0);
when(metadata.getNodes()).thenReturn(createNodesMap(node));
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> { StepVerifier.create(health).consumeNextWith((h) -> {
assertThat(h.getStatus()).isEqualTo(Status.UP); assertThat(h.getStatus()).isEqualTo(Status.UP);
assertThat(h.getDetails()).containsOnlyKeys("version"); assertThat(h.getDetails()).containsOnlyKeys("version");
assertThat(h.getDetails().get("version")).isEqualTo("6.0.0"); assertThat(h.getDetails().get("version")).isEqualTo(Version.V4_0_0);
}).verifyComplete();
}
@Test
void doNotAddVersionToDetailsIfReportedNull() {
CqlSession session = mock(CqlSession.class);
Metadata metadata = mock(Metadata.class);
when(session.getMetadata()).thenReturn(metadata);
Node node = mock(Node.class);
when(node.getState()).thenReturn(NodeState.UP);
when(metadata.getNodes()).thenReturn(createNodesMap(node));
CassandraDriverReactiveHealthIndicator healthIndicator = new CassandraDriverReactiveHealthIndicator(session);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> {
assertThat(h.getStatus()).isEqualTo(Status.UP);
assertThat(h.getDetails().get("version")).isNull();
}).verifyComplete(); }).verifyComplete();
} }
@Test @Test
void testCassandraIsDown() { void testCassandraIsDown() {
CqlSession session = mock(CqlSession.class); CqlSession session = mock(CqlSession.class);
given(session.executeReactive(any(SimpleStatement.class))) given(session.getMetadata()).willThrow(new DriverTimeoutException("Test Exception"));
.willThrow(new DriverTimeoutException("Test Exception"));
CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator( CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator(
session); session);
Mono<Health> health = cassandraReactiveHealthIndicator.health(); Mono<Health> health = cassandraReactiveHealthIndicator.health();
...@@ -84,23 +209,12 @@ class CassandraDriverReactiveHealthIndicatorTests { ...@@ -84,23 +209,12 @@ class CassandraDriverReactiveHealthIndicatorTests {
}).verifyComplete(); }).verifyComplete();
} }
private Answer<Void> mockReactiveResultSetBehavior(ReactiveRow row) { private static Map<UUID, Node> createNodesMap(Node... nodes) {
return (invocation) -> { Map<UUID, Node> nodesMap = new HashMap<>();
Subscriber<ReactiveRow> subscriber = invocation.getArgument(0); for (Node n : nodes) {
Subscription s = new Subscription() { nodesMap.put(UUID.randomUUID(), n);
@Override
public void request(long n) {
subscriber.onNext(row);
subscriber.onComplete();
}
@Override
public void cancel() {
} }
}; return nodesMap;
subscriber.onSubscribe(s);
return null;
};
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment