Commit 3b587449 authored by Scott Frederick's avatar Scott Frederick

Provide cluster info in RedisReactiveHealthIndicator

This commit changes the information provided by
RedisReactiveHealthIndicator to include cluster details when Spring
Data Redis detects that Redis is running in a clustered configuration.
This brings the reactive and non-reactive Redis health indicators
into alignment.

Fixes gh-21514
parent dba8ca28
...@@ -34,12 +34,19 @@ import org.springframework.util.Assert; ...@@ -34,12 +34,19 @@ import org.springframework.util.Assert;
* *
* @author Christian Dupuis * @author Christian Dupuis
* @author Richard Santana * @author Richard Santana
* @author Scott Frederick
* @since 2.0.0 * @since 2.0.0
*/ */
public class RedisHealthIndicator extends AbstractHealthIndicator { public class RedisHealthIndicator extends AbstractHealthIndicator {
static final String VERSION = "version"; static final String VERSION = "version";
static final String CLUSTER_SIZE = "cluster_size";
static final String SLOTS_UP = "slots_up";
static final String SLOTS_FAIL = "slots_fail";
static final String REDIS_VERSION = "redis_version"; static final String REDIS_VERSION = "redis_version";
private final RedisConnectionFactory redisConnectionFactory; private final RedisConnectionFactory redisConnectionFactory;
...@@ -56,9 +63,9 @@ public class RedisHealthIndicator extends AbstractHealthIndicator { ...@@ -56,9 +63,9 @@ public class RedisHealthIndicator extends AbstractHealthIndicator {
try { try {
if (connection instanceof RedisClusterConnection) { if (connection instanceof RedisClusterConnection) {
ClusterInfo clusterInfo = ((RedisClusterConnection) connection).clusterGetClusterInfo(); ClusterInfo clusterInfo = ((RedisClusterConnection) connection).clusterGetClusterInfo();
builder.up().withDetail("cluster_size", clusterInfo.getClusterSize()) builder.up().withDetail(CLUSTER_SIZE, clusterInfo.getClusterSize())
.withDetail("slots_up", clusterInfo.getSlotsOk()) .withDetail(SLOTS_UP, clusterInfo.getSlotsOk())
.withDetail("slots_fail", clusterInfo.getSlotsFail()); .withDetail(SLOTS_FAIL, clusterInfo.getSlotsFail());
} }
else { else {
Properties info = connection.info(); Properties info = connection.info();
......
...@@ -24,6 +24,7 @@ import reactor.core.scheduler.Schedulers; ...@@ -24,6 +24,7 @@ import reactor.core.scheduler.Schedulers;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.ReactiveHealthIndicator; import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.ReactiveRedisClusterConnection; import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
...@@ -52,10 +53,17 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato ...@@ -52,10 +53,17 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato
} }
private Mono<Health> doHealthCheck(Health.Builder builder, ReactiveRedisConnection connection) { private Mono<Health> doHealthCheck(Health.Builder builder, ReactiveRedisConnection connection) {
return connection.serverCommands().info() if (connection instanceof ReactiveRedisClusterConnection) {
.map((info) -> up(builder, info, (connection instanceof ReactiveRedisClusterConnection))) ReactiveRedisClusterConnection clusterConnection = (ReactiveRedisClusterConnection) connection;
.onErrorResume((ex) -> Mono.just(down(builder, ex))) return clusterConnection.clusterGetClusterInfo().map((info) -> up(builder, info))
.flatMap((health) -> connection.closeLater().thenReturn(health)); .onErrorResume((ex) -> Mono.just(down(builder, ex)))
.flatMap((health) -> clusterConnection.closeLater().thenReturn(health));
}
else {
return connection.serverCommands().info().map((info) -> up(builder, info))
.onErrorResume((ex) -> Mono.just(down(builder, ex)))
.flatMap((health) -> connection.closeLater().thenReturn(health));
}
} }
private Mono<ReactiveRedisConnection> getConnection() { private Mono<ReactiveRedisConnection> getConnection() {
...@@ -63,21 +71,15 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato ...@@ -63,21 +71,15 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(Schedulers.boundedElastic());
} }
private Health up(Health.Builder builder, Properties info, boolean isClusterConnection) { private Health up(Health.Builder builder, Properties info) {
if (isClusterConnection) { return builder.up()
return builder.up().withDetail(RedisHealthIndicator.VERSION, getClusterVersionProperty(info)).build(); .withDetail(RedisHealthIndicator.VERSION, info.getProperty(RedisHealthIndicator.REDIS_VERSION)).build();
}
else {
return builder.up()
.withDetail(RedisHealthIndicator.VERSION, info.getProperty(RedisHealthIndicator.REDIS_VERSION))
.build();
}
} }
private Object getClusterVersionProperty(Properties info) { private Health up(Health.Builder builder, ClusterInfo clusterInfo) {
return info.keySet().stream().map(String.class::cast) return builder.up().withDetail(RedisHealthIndicator.CLUSTER_SIZE, clusterInfo.getClusterSize())
.filter((key) -> key.endsWith(RedisHealthIndicator.REDIS_VERSION)).findFirst().map(info::get) .withDetail(RedisHealthIndicator.SLOTS_UP, clusterInfo.getSlotsOk())
.orElse(""); .withDetail(RedisHealthIndicator.SLOTS_FAIL, clusterInfo.getSlotsFail()).build();
} }
private Health down(Health.Builder builder, Throwable cause) { private Health down(Health.Builder builder, Throwable cause) {
......
...@@ -26,7 +26,7 @@ import reactor.test.StepVerifier; ...@@ -26,7 +26,7 @@ import reactor.test.StepVerifier;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status; import org.springframework.boot.actuate.health.Status;
import org.springframework.data.redis.RedisConnectionFailureException; import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.ReactiveClusterServerCommands; import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.ReactiveRedisClusterConnection; import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
...@@ -68,18 +68,22 @@ class RedisReactiveHealthIndicatorTests { ...@@ -68,18 +68,22 @@ class RedisReactiveHealthIndicatorTests {
@Test @Test
void redisClusterIsUp() { void redisClusterIsUp() {
Properties info = new Properties(); Properties clusterProperties = new Properties();
info.put("127.0.0.1:7002.redis_version", "2.8.9"); clusterProperties.setProperty("cluster_size", "4");
ReactiveRedisConnection redisConnection = mock(ReactiveRedisClusterConnection.class); clusterProperties.setProperty("cluster_slots_ok", "4");
clusterProperties.setProperty("cluster_slots_fail", "0");
ReactiveRedisClusterConnection redisConnection = mock(ReactiveRedisClusterConnection.class);
given(redisConnection.closeLater()).willReturn(Mono.empty()); given(redisConnection.closeLater()).willReturn(Mono.empty());
ReactiveClusterServerCommands commands = mock(ReactiveClusterServerCommands.class); given(redisConnection.clusterGetClusterInfo()).willReturn(Mono.just(new ClusterInfo(clusterProperties)));
given(commands.info()).willReturn(Mono.just(info)); ReactiveRedisConnectionFactory redisConnectionFactory = mock(ReactiveRedisConnectionFactory.class);
RedisReactiveHealthIndicator healthIndicator = createHealthIndicator(redisConnection, commands); given(redisConnectionFactory.getReactiveConnection()).willReturn(redisConnection);
RedisReactiveHealthIndicator healthIndicator = new RedisReactiveHealthIndicator(redisConnectionFactory);
Mono<Health> health = healthIndicator.health(); Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> { StepVerifier.create(health).consumeNextWith((h) -> {
assertThat(h.getStatus()).isEqualTo(Status.UP); assertThat(h.getStatus()).isEqualTo(Status.UP);
assertThat(h.getDetails()).containsOnlyKeys("version"); assertThat(h.getDetails().get("cluster_size")).isEqualTo(4L);
assertThat(h.getDetails().get("version")).isEqualTo("2.8.9"); assertThat(h.getDetails().get("slots_up")).isEqualTo(4L);
assertThat(h.getDetails().get("slots_fail")).isEqualTo(0L);
}).verifyComplete(); }).verifyComplete();
verify(redisConnection).closeLater(); verify(redisConnection).closeLater();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment