Commit d49cc20e authored by Andy Wilkinson's avatar Andy Wilkinson

Rework CouchbaseReactiveHealthIndicator to use DiagnosticsReport

Closes gh-14799
parent 80fdd473
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.couchbase;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import com.couchbase.client.core.message.internal.DiagnosticsReport;
import com.couchbase.client.core.message.internal.EndpointHealth;
import com.couchbase.client.core.state.LifecycleState;
import org.springframework.boot.actuate.health.Health.Builder;
/**
* Details of Couchbase's health.
*
* @author Andy Wilkinson
*/
class CouchbaseHealth {
private final DiagnosticsReport diagnostics;
CouchbaseHealth(DiagnosticsReport diagnostics) {
this.diagnostics = diagnostics;
}
void applyTo(Builder builder) {
builder = isCouchbaseUp(this.diagnostics) ? builder.up() : builder.down();
builder.withDetail("sdk", this.diagnostics.sdk());
builder.withDetail("endpoints", this.diagnostics.endpoints().stream()
.map(this::describe).collect(Collectors.toList()));
}
private boolean isCouchbaseUp(DiagnosticsReport diagnostics) {
for (EndpointHealth health : diagnostics.endpoints()) {
LifecycleState state = health.state();
if (state != LifecycleState.CONNECTED && state != LifecycleState.IDLE) {
return false;
}
}
return true;
}
private Map<String, Object> describe(EndpointHealth endpointHealth) {
Map<String, Object> map = new HashMap<>();
map.put("id", endpointHealth.id());
map.put("lastActivity", endpointHealth.lastActivity());
map.put("local", endpointHealth.local().toString());
map.put("remote", endpointHealth.remote().toString());
map.put("state", endpointHealth.state());
map.put("type", endpointHealth.type());
return map;
}
}
...@@ -16,13 +16,7 @@ ...@@ -16,13 +16,7 @@
package org.springframework.boot.actuate.couchbase; package org.springframework.boot.actuate.couchbase;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import com.couchbase.client.core.message.internal.DiagnosticsReport; import com.couchbase.client.core.message.internal.DiagnosticsReport;
import com.couchbase.client.core.message.internal.EndpointHealth;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.java.Cluster; import com.couchbase.client.java.Cluster;
import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.AbstractHealthIndicator;
...@@ -55,31 +49,7 @@ public class CouchbaseHealthIndicator extends AbstractHealthIndicator { ...@@ -55,31 +49,7 @@ public class CouchbaseHealthIndicator extends AbstractHealthIndicator {
@Override @Override
protected void doHealthCheck(Health.Builder builder) throws Exception { protected void doHealthCheck(Health.Builder builder) throws Exception {
DiagnosticsReport diagnostics = this.cluster.diagnostics(); DiagnosticsReport diagnostics = this.cluster.diagnostics();
builder = isCouchbaseUp(diagnostics) ? builder.up() : builder.down(); new CouchbaseHealth(diagnostics).applyTo(builder);
builder.withDetail("sdk", diagnostics.sdk());
builder.withDetail("endpoints", diagnostics.endpoints().stream()
.map(this::describe).collect(Collectors.toList()));
}
private boolean isCouchbaseUp(DiagnosticsReport diagnostics) {
for (EndpointHealth health : diagnostics.endpoints()) {
LifecycleState state = health.state();
if (state != LifecycleState.CONNECTED && state != LifecycleState.IDLE) {
return false;
}
}
return true;
}
private Map<String, Object> describe(EndpointHealth endpointHealth) {
Map<String, Object> map = new HashMap<>();
map.put("id", endpointHealth.id());
map.put("lastActivity", endpointHealth.lastActivity());
map.put("local", endpointHealth.local().toString());
map.put("remote", endpointHealth.remote().toString());
map.put("state", endpointHealth.state());
map.put("type", endpointHealth.type());
return map;
} }
} }
...@@ -15,18 +15,13 @@ ...@@ -15,18 +15,13 @@
*/ */
package org.springframework.boot.actuate.couchbase; package org.springframework.boot.actuate.couchbase;
import com.couchbase.client.java.bucket.BucketInfo; import com.couchbase.client.core.message.internal.DiagnosticsReport;
import com.couchbase.client.java.cluster.ClusterInfo; import com.couchbase.client.java.Cluster;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.ReactiveHealthIndicator; import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
import org.springframework.data.couchbase.core.RxJavaCouchbaseOperations;
import org.springframework.util.StringUtils;
/** /**
* A {@link ReactiveHealthIndicator} for Couchbase. * A {@link ReactiveHealthIndicator} for Couchbase.
...@@ -37,30 +32,21 @@ import org.springframework.util.StringUtils; ...@@ -37,30 +32,21 @@ import org.springframework.util.StringUtils;
*/ */
public class CouchbaseReactiveHealthIndicator extends AbstractReactiveHealthIndicator { public class CouchbaseReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
private final RxJavaCouchbaseOperations couchbaseOperations; private final Cluster cluster;
/** /**
* Create a new {@link CouchbaseReactiveHealthIndicator} instance. * Create a new {@link CouchbaseReactiveHealthIndicator} instance.
* @param couchbaseOperations the reactive couchbase operations * @param cluster the Couchbase cluster
*/ */
public CouchbaseReactiveHealthIndicator( public CouchbaseReactiveHealthIndicator(Cluster cluster) {
RxJavaCouchbaseOperations couchbaseOperations) { this.cluster = cluster;
this.couchbaseOperations = couchbaseOperations;
} }
@Override @Override
protected Mono<Health> doHealthCheck(Health.Builder builder) { protected Mono<Health> doHealthCheck(Health.Builder builder) {
ClusterInfo cluster = this.couchbaseOperations.getCouchbaseClusterInfo(); DiagnosticsReport diagnostics = this.cluster.diagnostics();
String versions = StringUtils new CouchbaseHealth(diagnostics).applyTo(builder);
.collectionToCommaDelimitedString(cluster.getAllVersions()); return Mono.just(builder.build());
Observable<BucketInfo> bucket = this.couchbaseOperations.getCouchbaseBucket()
.bucketManager().async().info();
Single<Health> health = bucket.map(BucketInfo::nodeList)
.map(StringUtils::collectionToCommaDelimitedString)
.map((nodes) -> builder.up().withDetail("versions", versions)
.withDetail("nodes", nodes).build())
.toSingle();
return Mono.from(RxReactiveStreams.toPublisher(health));
} }
} }
...@@ -15,28 +15,25 @@ ...@@ -15,28 +15,25 @@
*/ */
package org.springframework.boot.actuate.couchbase; package org.springframework.boot.actuate.couchbase;
import java.net.InetAddress; import java.net.InetSocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map;
import com.couchbase.client.java.Bucket; import com.couchbase.client.core.message.internal.DiagnosticsReport;
import com.couchbase.client.java.bucket.AsyncBucketManager; import com.couchbase.client.core.message.internal.EndpointHealth;
import com.couchbase.client.java.bucket.BucketInfo; import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.java.bucket.BucketManager; import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.java.cluster.ClusterInfo; import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.error.TranscodingException;
import com.couchbase.client.java.util.features.Version;
import org.junit.Test; import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import rx.Observable;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status; import org.springframework.boot.actuate.health.Status;
import org.springframework.data.couchbase.core.RxJavaCouchbaseOperations;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
/** /**
* Tests for {@link CouchbaseReactiveHealthIndicator}. * Tests for {@link CouchbaseReactiveHealthIndicator}.
...@@ -44,62 +41,49 @@ import static org.mockito.Mockito.mock; ...@@ -44,62 +41,49 @@ import static org.mockito.Mockito.mock;
public class CouchbaseReactiveHealthIndicatorTests { public class CouchbaseReactiveHealthIndicatorTests {
@Test @Test
public void couchbaseIsUp() { @SuppressWarnings("unchecked")
RxJavaCouchbaseOperations rxJavaCouchbaseOperations = mock( public void couchbaseClusterIsUp() {
RxJavaCouchbaseOperations.class); Cluster cluster = mock(Cluster.class);
AsyncBucketManager asyncBucketManager = mockAsyncBucketManager( CouchbaseReactiveHealthIndicator healthIndicator = new CouchbaseReactiveHealthIndicator(
rxJavaCouchbaseOperations); cluster);
BucketInfo info = mock(BucketInfo.class); List<EndpointHealth> endpoints = Arrays.asList(new EndpointHealth(
InetAddress node1Address = mock(InetAddress.class); ServiceType.BINARY, LifecycleState.CONNECTED, new InetSocketAddress(0),
InetAddress node2Address = mock(InetAddress.class); new InetSocketAddress(0), 1234, "endpoint-1"));
given(info.nodeList()).willReturn(Arrays.asList(node1Address, node2Address)); DiagnosticsReport diagnostics = new DiagnosticsReport(endpoints, "test-sdk",
given(node1Address.toString()).willReturn("127.0.0.1"); "test-id", null);
given(node2Address.toString()).willReturn("127.0.0.2"); given(cluster.diagnostics()).willReturn(diagnostics);
given(asyncBucketManager.info()).willReturn(Observable.just(info)); Health health = healthIndicator.health().block();
CouchbaseReactiveHealthIndicator couchbaseReactiveHealthIndicator = new CouchbaseReactiveHealthIndicator( assertThat(health.getStatus()).isEqualTo(Status.UP);
rxJavaCouchbaseOperations); assertThat(health.getDetails()).containsEntry("sdk", "test-sdk");
Mono<Health> health = couchbaseReactiveHealthIndicator.health(); assertThat(health.getDetails()).containsKey("endpoints");
StepVerifier.create(health).consumeNextWith((h) -> { assertThat((List<Map<String, Object>>) health.getDetails().get("endpoints"))
assertThat(h.getStatus()).isEqualTo(Status.UP); .hasSize(1);
assertThat(h.getDetails()).containsKeys("versions", "nodes"); verify(cluster).diagnostics();
assertThat(h.getDetails().get("versions")).isEqualTo("5.5.0,6.0.0");
assertThat(h.getDetails().get("nodes")).isEqualTo("127.0.0.1,127.0.0.2");
}).verifyComplete();
} }
@Test @Test
public void couchbaseIsDown() { @SuppressWarnings("unchecked")
RxJavaCouchbaseOperations rxJavaCouchbaseOperations = mock( public void couchbaseClusterIsDown() {
RxJavaCouchbaseOperations.class); Cluster cluster = mock(Cluster.class);
AsyncBucketManager asyncBucketManager = mockAsyncBucketManager( CouchbaseReactiveHealthIndicator healthIndicator = new CouchbaseReactiveHealthIndicator(
rxJavaCouchbaseOperations); cluster);
given(asyncBucketManager.info()) List<EndpointHealth> endpoints = Arrays.asList(
.willReturn(Observable.error(new TranscodingException("Failure"))); new EndpointHealth(ServiceType.BINARY, LifecycleState.CONNECTED,
CouchbaseReactiveHealthIndicator couchbaseReactiveHealthIndicator = new CouchbaseReactiveHealthIndicator( new InetSocketAddress(0), new InetSocketAddress(0), 1234,
rxJavaCouchbaseOperations); "endpoint-1"),
Mono<Health> health = couchbaseReactiveHealthIndicator.health(); new EndpointHealth(ServiceType.BINARY, LifecycleState.CONNECTING,
StepVerifier.create(health).consumeNextWith((h) -> { new InetSocketAddress(0), new InetSocketAddress(0), 1234,
assertThat(h.getStatus()).isEqualTo(Status.DOWN); "endpoint-2"));
assertThat(h.getDetails()).containsOnlyKeys("error"); DiagnosticsReport diagnostics = new DiagnosticsReport(endpoints, "test-sdk",
assertThat(h.getDetails().get("error")) "test-id", null);
.isEqualTo(TranscodingException.class.getName() + ": Failure"); given(cluster.diagnostics()).willReturn(diagnostics);
}).verifyComplete(); Health health = healthIndicator.health().block();
} assertThat(health.getStatus()).isEqualTo(Status.DOWN);
assertThat(health.getDetails()).containsEntry("sdk", "test-sdk");
private AsyncBucketManager mockAsyncBucketManager( assertThat(health.getDetails()).containsKey("endpoints");
RxJavaCouchbaseOperations rxJavaCouchbaseOperations) { assertThat((List<Map<String, Object>>) health.getDetails().get("endpoints"))
ClusterInfo clusterInfo = mock(ClusterInfo.class); .hasSize(2);
given(rxJavaCouchbaseOperations.getCouchbaseClusterInfo()) verify(cluster).diagnostics();
.willReturn(clusterInfo);
given(clusterInfo.getAllVersions())
.willReturn(Arrays.asList(new Version(5, 5, 0), new Version(6, 0, 0)));
Bucket bucket = mock(Bucket.class);
BucketManager bucketManager = mock(BucketManager.class);
AsyncBucketManager asyncBucketManager = mock(AsyncBucketManager.class);
given(rxJavaCouchbaseOperations.getCouchbaseBucket()).willReturn(bucket);
given(bucket.bucketManager()).willReturn(bucketManager);
given(bucketManager.async()).willReturn(asyncBucketManager);
return asyncBucketManager;
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment