Commit 0dbd9429 authored by Juan Camilo Rada's avatar Juan Camilo Rada Committed by Stephane Nicoll

Add Kafka health indicator

See gh-11515
parent 76a450df
......@@ -193,6 +193,11 @@
<artifactId>elasticsearch</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
......
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.kafka;
import java.time.Duration;
import java.util.Map;
import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthIndicatorConfiguration;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
/**
* {@link EnableAutoConfiguration Auto-configuration} for {@link KafkaHealthIndicator}.
*
* @author Juan Rada
*/
@Configuration
@ConditionalOnEnabledHealthIndicator("kafka")
@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class)
@AutoConfigureAfter(KafkaAutoConfiguration.class)
public class KafkaHealthIndicatorAutoConfiguration {
@Configuration
@ConditionalOnBean(KafkaAdmin.class)
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
static class KafkaClientHealthIndicatorConfiguration extends
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
private final Map<String, KafkaAdmin> admins;
private final KafkaHealthIndicatorProperties properties;
KafkaClientHealthIndicatorConfiguration(Map<String, KafkaAdmin> admins,
KafkaHealthIndicatorProperties properties) {
this.admins = admins;
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean(name = "kafkaHealthIndicator")
public HealthIndicator kafkaHealthIndicator() {
return createHealthIndicator(this.admins);
}
@Override
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
Duration responseTimeout = this.properties.getResponseTimeout();
return new KafkaHealthIndicator(source,
responseTimeout == null ? 100L : responseTimeout.toMillis());
}
}
}
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.kafka;
import java.time.Duration;
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Configuration properties for {@link KafkaHealthIndicator}.
*
* @author Juan Rada
*/
@ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false)
public class KafkaHealthIndicatorProperties {
/**
* Time to wait for a response from the cluster description operation.
*/
private Duration responseTimeout = Duration.ofMillis(100);
public Duration getResponseTimeout() {
return this.responseTimeout;
}
public void setResponseTimeout(Duration responseTimeout) {
this.responseTimeout = responseTimeout;
}
}
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Auto-configuration for actuator kafka support.
*/
package org.springframework.boot.actuate.autoconfigure.kafka;
......@@ -145,6 +145,12 @@
"description": "Whether to enable Neo4j health check.",
"defaultValue": true
},
{
"name": "management.health.kafka.enabled",
"type": "java.lang.Boolean",
"description": "Whether to enable kafka health check.",
"defaultValue": true
},
{
"name": "management.info.build.enabled",
"type": "java.lang.Boolean",
......
......@@ -11,6 +11,7 @@ org.springframework.boot.actuate.autoconfigure.context.properties.ConfigurationP
org.springframework.boot.actuate.autoconfigure.context.ShutdownEndpointAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseHealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticsearchHealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.endpoint.jmx.JmxEndpointAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointAutoConfiguration,\
......
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.kafka;
import org.junit.Test;
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
import org.springframework.boot.actuate.health.ApplicationHealthIndicator;
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link KafkaHealthIndicatorAutoConfiguration}.
*
* @author Juan Rada
*/
public class KafkaHealthIndicatorAutoConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class,
KafkaHealthIndicatorAutoConfiguration.class,
HealthIndicatorAutoConfiguration.class));
@Test
public void runShouldCreateIndicator() {
this.contextRunner.run((context) -> assertThat(context)
.hasSingleBean(KafkaHealthIndicator.class)
.doesNotHaveBean(ApplicationHealthIndicator.class));
}
@Test
public void runWhenDisabledShouldNotCreateIndicator() {
this.contextRunner.withPropertyValues("management.health.kafka.enabled:false")
.run((context) -> assertThat(context)
.doesNotHaveBean(KafkaHealthIndicator.class)
.hasSingleBean(ApplicationHealthIndicator.class));
}
}
......@@ -172,6 +172,11 @@
<artifactId>spring-rabbit</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra</artifactId>
......@@ -257,6 +262,11 @@
<artifactId>spring-boot-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
......@@ -267,6 +277,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
......
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.kafka;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health.Builder;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.util.Assert;
/**
* {@link HealthIndicator} for Kafka cluster.
*
* @author Juan Rada
*/
public class KafkaHealthIndicator extends AbstractHealthIndicator {
static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
private final KafkaAdmin kafkaAdmin;
private final DescribeClusterOptions describeOptions;
/**
* Create a new {@link KafkaHealthIndicator} instance.
*
* @param kafkaAdmin the kafka admin
* @param responseTimeout the describe cluster request timeout in milliseconds
*/
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) {
Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null");
this.kafkaAdmin = kafkaAdmin;
this.describeOptions = new DescribeClusterOptions()
.timeoutMs((int) responseTimeout);
}
@Override
protected void doHealthCheck(Builder builder) throws Exception {
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
DescribeClusterResult result = adminClient.describeCluster(this.describeOptions);
String brokerId = result.controller().get().idString();
int replicationFactor = getReplicationFactor(brokerId, adminClient);
int nodes = result.nodes().get().size();
if (nodes >= replicationFactor) {
builder.up();
}
else {
builder.down();
}
builder.withDetail("clusterId", result.clusterId().get());
builder.withDetail("brokerId", brokerId);
builder.withDetail("nodes", nodes);
}
}
private int getReplicationFactor(String brokerId,
AdminClient adminClient) throws ExecutionException, InterruptedException {
ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId);
Map<ConfigResource, Config> kafkaConfig = adminClient
.describeConfigs(Collections.singletonList(configResource)).all().get();
Config brokerConfig = kafkaConfig.get(configResource);
return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
}
}
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Actuator support for Kafka.
*/
package org.springframework.boot.actuate.kafka;
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.kafka;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Test;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test for {@link KafkaHealthIndicator}
*
* @author Juan Rada
*/
public class KafkaHealthIndicatorTests {
private static final Long RESPONSE_TIME = 1000L;
private KafkaEmbedded kafkaEmbedded;
private KafkaAdmin kafkaAdmin;
private void startKafka(int replicationFactor) throws Exception {
this.kafkaEmbedded = new KafkaEmbedded(1, true);
this.kafkaEmbedded.brokerProperties(Collections.singletonMap(
KafkaHealthIndicator.REPLICATION_PROPERTY,
String.valueOf(replicationFactor)));
this.kafkaEmbedded.before();
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.kafkaEmbedded.getBrokersAsString()));
}
private void shutdownKafka() throws Exception {
this.kafkaEmbedded.destroy();
}
@Test
public void kafkaIsUp() throws Exception {
startKafka(1);
KafkaHealthIndicator healthIndicator =
new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
assertDetails(health.getDetails());
shutdownKafka();
}
private void assertDetails(Map<String, Object> details) {
assertThat(details).containsEntry("brokerId", "0");
assertThat(details).containsKey("clusterId");
assertThat(details).containsEntry("nodes", 1);
}
@Test
public void notEnoughNodesForReplicationFactor() throws Exception {
startKafka(2);
KafkaHealthIndicator healthIndicator =
new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
assertDetails(health.getDetails());
shutdownKafka();
}
@Test
public void kafkaIsDown() throws Exception {
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:34987"));
KafkaHealthIndicator healthIndicator =
new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
assertThat((String) health.getDetails().get("error")).isNotEmpty();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment