Initial metrics implementation
This commit is contained in:
committed by
Mark Pollack
parent
8230a8b348
commit
e05d5e03bd
1
pom.xml
1
pom.xml
@@ -115,6 +115,7 @@
|
||||
<module>spring-cloud-stream-schema</module>
|
||||
<module>spring-cloud-stream-schema-server</module>
|
||||
<module>spring-cloud-stream-tools</module>
|
||||
<module>spring-cloud-stream-metrics</module>
|
||||
</modules>
|
||||
<build>
|
||||
<pluginManagement>
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[[spring-cloud-stream-reference]]
|
||||
= Spring Cloud Stream Reference Guide
|
||||
Sabby Anandan; Marius Bogoevici; Eric Bottard; Mark Fisher; Ilayaperumal Gopinathan; Gunnar Hillert; Mark Pollack; Patrick Peralta; Glenn Renfro; Thomas Risberg; Dave Syer; David Turanski; Janne Valkealahti; Benjamin Klein
|
||||
Sabby Anandan; Marius Bogoevici; Eric Bottard; Mark Fisher; Ilayaperumal Gopinathan; Gunnar Hillert; Mark Pollack; Patrick Peralta; Glenn Renfro; Thomas Risberg; Dave Syer; David Turanski; Janne Valkealahti; Benjamin Klein; Vinicius Carvalho
|
||||
:doctype: book
|
||||
:toc:
|
||||
:toclevels: 4
|
||||
|
||||
@@ -1813,6 +1813,50 @@ Once we have received the message, we can validate that the component functions
|
||||
Spring Cloud Stream provides a health indicator for binders.
|
||||
It is registered under the name of `binders` and can be enabled or disabled by setting the `management.health.binders.enabled` property.
|
||||
|
||||
== Metrics Emitter
|
||||
Spring Cloud Stream provides a module called `spring-cloud-stream-metrics` that can be used to emit any available metric from https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-metrics.html[Spring Boot metrics endpoint] to a named channel.
|
||||
This module allow operators to collect metrics from stream applications without relying on polling their endpoints.
|
||||
|
||||
HTTP polling can be challenging on cloud environments since applications could be on a private network or behind a Load balancer that prevents per instance access.
|
||||
|
||||
This module is included on the each of the out of box https://github.com/spring-cloud-stream-app-starters[application starters], if you want to emit metrics from your application, just import it on your pom.xml and follow the activation instructions.
|
||||
|
||||
The module is activated when you set the destination name for its channel, `spring.cloud.stream.bindings.streamMetrics.destination=<DESTINATION_NAME>`.
|
||||
|
||||
By default the module is configured to only send Spring Integration message channel metrics.
|
||||
|
||||
Available properties for customization using the prefix `spring.cloud.stream.metrics`.
|
||||
|
||||
delay-millis::
|
||||
The period in which metrics will be posted to the channel
|
||||
+
|
||||
Default: `5000`.
|
||||
+
|
||||
prefix::
|
||||
Prefix string to be prepended to the metrics name
|
||||
+
|
||||
Default: ``
|
||||
+
|
||||
includes::
|
||||
An array of strings containing regex patterns of metrics that should be included.
|
||||
+
|
||||
Default: `integration**`
|
||||
+
|
||||
excludes::
|
||||
An array of strings containing regex patterns of metrics that should be excluded.
|
||||
+
|
||||
Default: ``
|
||||
+
|
||||
properties::
|
||||
Just like the `includes` option, it allows white listing application properties that will be added to the metrics payload
|
||||
+
|
||||
Default: null.
|
||||
+
|
||||
spring.cloud.stream.bindings.streamMetrics.contentType::
|
||||
Content-Type of the message
|
||||
+
|
||||
Default: `application/json`
|
||||
|
||||
== Samples
|
||||
|
||||
For Spring Cloud Stream samples, please refer to the https://github.com/spring-cloud/spring-cloud-stream-samples[spring-cloud-stream-samples] repository on GitHub.
|
||||
|
||||
34
spring-cloud-stream-metrics/pom.xml
Normal file
34
spring-cloud-stream-metrics/pom.xml
Normal file
@@ -0,0 +1,34 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-metrics</artifactId>
|
||||
<description>Emitter module to publish boot metrics</description>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-parent</artifactId>
|
||||
<version>1.2.0.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-test-support</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.config.metrics;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.MetricsEndpoint;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.metrics.BinderMetricsEmitter;
|
||||
import org.springframework.cloud.stream.metrics.BootMetricJsonSerializer;
|
||||
import org.springframework.cloud.stream.metrics.Emitter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(Binder.class)
|
||||
@EnableScheduling
|
||||
@EnableBinding(Emitter.class)
|
||||
@EnableConfigurationProperties(StreamMetricsProperties.class)
|
||||
@ConditionalOnProperty("spring.cloud.stream.bindings."+ Emitter.METRICS_CHANNEL_NAME + ".destination")
|
||||
public class BinderMetricsAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public BinderMetricsEmitter binderMetricsExporter(MetricsEndpoint endpoint) {
|
||||
return new BinderMetricsEmitter(endpoint);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public BootMetricJsonSerializer metricJsonSerializer() {
|
||||
return new BootMetricJsonSerializer();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.config.metrics;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.env.EnvironmentPostProcessor;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.MapPropertySource;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
*/
|
||||
public class BinderMetricsEnvironmentPostProcessor implements EnvironmentPostProcessor {
|
||||
@Override
|
||||
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
|
||||
Map<String, Object> propertiesToAdd = new HashMap<>();
|
||||
propertiesToAdd.put("spring.cloud.stream.bindings.streamMetrics.contentType","application/json");
|
||||
environment.getPropertySources().addLast(new MapPropertySource("binderMetricsDefaultProperties",propertiesToAdd));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.config.metrics;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.metrics")
|
||||
public class StreamMetricsProperties {
|
||||
|
||||
|
||||
private String prefix;
|
||||
|
||||
@Value("${spring.application.name:${vcap.application.name:${spring.config.name:application}}}")
|
||||
private String applicationName;
|
||||
|
||||
private String metricName;
|
||||
|
||||
private String[] includes = new String[]{"integration**"};
|
||||
|
||||
private String[] excludes;
|
||||
|
||||
private String[] properties;
|
||||
|
||||
private Long delayMillis;
|
||||
|
||||
public String[] getIncludes() {
|
||||
return includes;
|
||||
}
|
||||
|
||||
public void setIncludes(String[] includes) {
|
||||
this.includes = includes;
|
||||
}
|
||||
|
||||
public String[] getExcludes() {
|
||||
return excludes;
|
||||
}
|
||||
|
||||
public void setExcludes(String[] excludes) {
|
||||
this.excludes = excludes;
|
||||
}
|
||||
|
||||
public Long getDelayMillis() {
|
||||
return delayMillis;
|
||||
}
|
||||
|
||||
public void setDelayMillis(Long delayMillis) {
|
||||
this.delayMillis = delayMillis;
|
||||
}
|
||||
|
||||
public String getPrefix() {
|
||||
return prefix;
|
||||
}
|
||||
|
||||
public void setPrefix(String prefix) {
|
||||
this.prefix = prefix;
|
||||
}
|
||||
|
||||
public String getApplicationName() {
|
||||
return applicationName;
|
||||
}
|
||||
|
||||
public String[] getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public void setProperties(String[] properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public String getMetricName() {
|
||||
if(this.metricName == null){
|
||||
this.metricName = resolveMetricName();
|
||||
}
|
||||
return metricName;
|
||||
}
|
||||
|
||||
private String resolveMetricName(){
|
||||
StringBuffer name = new StringBuffer(this.applicationName);
|
||||
if(!StringUtils.isEmpty(this.prefix)){
|
||||
String prefix = this.prefix;
|
||||
if(prefix.lastIndexOf(".") == -1){
|
||||
prefix += ".";
|
||||
}
|
||||
name.insert(0,prefix);
|
||||
}
|
||||
return name.toString();
|
||||
}
|
||||
|
||||
public void setApplicationName(String applicationName) {
|
||||
this.applicationName = applicationName;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.metrics;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import org.springframework.boot.actuate.metrics.Metric;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
*/
|
||||
public class ApplicationMetrics {
|
||||
|
||||
private String name;
|
||||
|
||||
private int instanceIndex;
|
||||
|
||||
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC")
|
||||
private final Date createdTime;
|
||||
|
||||
private Collection<Metric> metrics;
|
||||
|
||||
private Map<String,Object> properties;
|
||||
|
||||
|
||||
@JsonCreator
|
||||
public ApplicationMetrics(@JsonProperty("name") String name,
|
||||
@JsonProperty("instanceIndex") int instanceIndex,
|
||||
@JsonProperty("metrics") Collection<Metric> metrics) {
|
||||
this.name = name;
|
||||
this.instanceIndex = instanceIndex;
|
||||
this.metrics = metrics;
|
||||
this.createdTime = new Date();
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public int getInstanceIndex() {
|
||||
return instanceIndex;
|
||||
}
|
||||
|
||||
public void setInstanceIndex(int instanceIndex) {
|
||||
this.instanceIndex = instanceIndex;
|
||||
}
|
||||
|
||||
public Collection<Metric> getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
public void setMetrics(Collection<Metric> metrics) {
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
public Date getCreatedTime() {
|
||||
return createdTime;
|
||||
}
|
||||
|
||||
public Map<String, Object> getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public void setProperties(Map<String, Object> properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.metrics;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.actuate.endpoint.MetricsEndpoint;
|
||||
import org.springframework.boot.actuate.endpoint.MetricsEndpointMetricReader;
|
||||
import org.springframework.boot.actuate.metrics.Metric;
|
||||
import org.springframework.boot.actuate.metrics.export.MetricCopyExporter;
|
||||
import org.springframework.boot.bind.RelaxedNames;
|
||||
import org.springframework.cloud.stream.config.BindingServiceProperties;
|
||||
import org.springframework.cloud.stream.config.metrics.StreamMetricsProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.event.ContextRefreshedEvent;
|
||||
import org.springframework.core.env.EnumerablePropertySource;
|
||||
import org.springframework.core.env.PropertySource;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.PatternMatchUtils;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
*
|
||||
* Component that sends metrics from {@link MetricsEndpointMetricReader} downstream via the configured metrics channel.
|
||||
*
|
||||
* It uses {@link Scheduled} support to periodially emit messages polled from the endpoint.
|
||||
*/
|
||||
public class BinderMetricsEmitter implements ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {
|
||||
|
||||
@Autowired
|
||||
private Emitter source;
|
||||
|
||||
@Autowired
|
||||
private StreamMetricsProperties properties;
|
||||
|
||||
@Autowired
|
||||
private BindingServiceProperties bindingServiceProperties;
|
||||
|
||||
private MetricsEndpointMetricReader metricsReader;
|
||||
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
/**
|
||||
* List of properties that are going to be appended to each message.
|
||||
* This gets populate by onApplicationEvent, once the context refreshes to avoid overhead of doing per message basis.
|
||||
*/
|
||||
private Map<String,Object> whitelistedProperties;
|
||||
|
||||
public BinderMetricsEmitter(MetricsEndpoint endpoint){
|
||||
this.metricsReader = new MetricsEndpointMetricReader(endpoint);
|
||||
this.whitelistedProperties = new HashMap<>();
|
||||
}
|
||||
|
||||
@Scheduled(fixedRateString = "${spring.cloud.stream.metrics.delay-millis:5000}")
|
||||
public void sendMetrics(){
|
||||
ApplicationMetrics appMetrics = new ApplicationMetrics(this.properties.getMetricName(),
|
||||
this.bindingServiceProperties.getInstanceIndex(),
|
||||
filter());
|
||||
appMetrics.setProperties(whitelistedProperties);
|
||||
source.metrics().send(MessageBuilder.withPayload(appMetrics).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Shameless copied from {@link MetricCopyExporter}
|
||||
* @return
|
||||
*/
|
||||
protected Collection<Metric> filter(){
|
||||
Collection<Metric> result = new ArrayList<>();
|
||||
Iterable<Metric<?>> metrics = metricsReader.findAll();
|
||||
for(Metric metric : metrics){
|
||||
if(isMatch(metric.getName(),this.properties.getIncludes(),this.properties.getExcludes())){
|
||||
result.add(metric);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shameless copied from {@link MetricCopyExporter}
|
||||
* @return
|
||||
*/
|
||||
private boolean isMatch(String name, String[] includes, String[] excludes) {
|
||||
if (ObjectUtils.isEmpty(includes)
|
||||
|| PatternMatchUtils.simpleMatch(includes, name)) {
|
||||
return !PatternMatchUtils.simpleMatch(excludes, name);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Iterates over all property sources from this application context and copies the ones listed in {@link StreamMetricsProperties} includes
|
||||
*/
|
||||
public void onApplicationEvent(ContextRefreshedEvent event) {
|
||||
ConfigurableApplicationContext ctx = (ConfigurableApplicationContext) event.getSource();
|
||||
if (!ObjectUtils.isEmpty(this.properties.getProperties())) {
|
||||
for (PropertySource source : ctx.getEnvironment().getPropertySources()) {
|
||||
if (source instanceof EnumerablePropertySource) {
|
||||
EnumerablePropertySource e = (EnumerablePropertySource) source;
|
||||
for (String propertyName : e.getPropertyNames()) {
|
||||
for (String relaxedPropertyName : new RelaxedNames(propertyName)) {
|
||||
if (isMatch(relaxedPropertyName, this.properties.getProperties(), null)) {
|
||||
whitelistedProperties.put(relaxedPropertyName, source.getProperty(propertyName));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.metrics;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.DateFormat;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
|
||||
import org.springframework.boot.actuate.metrics.Metric;
|
||||
import org.springframework.boot.jackson.JsonComponent;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
*/
|
||||
@JsonComponent
|
||||
public class BootMetricJsonSerializer {
|
||||
|
||||
final static DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
|
||||
|
||||
public static class Serializer extends JsonSerializer<Metric> {
|
||||
|
||||
@Override
|
||||
public void serialize(Metric metric, JsonGenerator json, SerializerProvider serializerProvider) throws IOException {
|
||||
json.writeStartObject();
|
||||
json.writeStringField("name",metric.getName());
|
||||
json.writeNumberField("value",metric.getValue().doubleValue());
|
||||
json.writeStringField("timestamp",df.format(metric.getTimestamp()));
|
||||
json.writeEndObject();
|
||||
}
|
||||
}
|
||||
|
||||
public static class Deserializer extends JsonDeserializer<Metric> {
|
||||
|
||||
|
||||
@Override
|
||||
public Metric deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException {
|
||||
JsonNode node = p.getCodec().readTree(p);
|
||||
String name = node.get("name").asText();
|
||||
Number value = node.get("value").asDouble();
|
||||
Date timestamp = null;
|
||||
|
||||
try {
|
||||
timestamp = df.parse(node.get("timestamp").asText());
|
||||
}
|
||||
catch (ParseException e) {
|
||||
}
|
||||
Metric<Number> metric = new Metric(name,value,timestamp);
|
||||
|
||||
return metric;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.metrics;
|
||||
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
*/
|
||||
public interface Emitter {
|
||||
|
||||
String METRICS_CHANNEL_NAME = "streamMetrics";
|
||||
|
||||
@Output(METRICS_CHANNEL_NAME)
|
||||
MessageChannel metrics();
|
||||
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.stream.config.metrics.BinderMetricsAutoConfiguration
|
||||
org.springframework.boot.env.EnvironmentPostProcessor=\
|
||||
org.springframework.cloud.stream.config.metrics.BinderMetricsEnvironmentPostProcessor
|
||||
@@ -0,0 +1,198 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.metrics;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.actuate.metrics.Metric;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.test.binder.MessageCollector;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
*/
|
||||
public class BinderMetricsEmitterTests {
|
||||
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void setSystemProps() {
|
||||
System.setProperty("SPRING_TEST_ENV_SYNTAX","testing");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void unsetSystemProps() {
|
||||
System.clearProperty("SPRING_TEST_ENV_SYNTAX");
|
||||
}
|
||||
|
||||
|
||||
@Test(expected = NoSuchBeanDefinitionException.class)
|
||||
public void checkDisabledConfiguration() throws Exception {
|
||||
ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.metrics.delay-millis=500"
|
||||
);
|
||||
try {
|
||||
applicationContext.getBean(Emitter.class);
|
||||
} catch (Exception e){
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void defaultIncludes() throws Exception {
|
||||
ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.metrics.delay-millis=500",
|
||||
"--spring.cloud.stream.bindings.streamMetrics.destination=foo");
|
||||
Emitter emitterSource = applicationContext.getBean(Emitter.class);
|
||||
MessageCollector collector = applicationContext.getBean(MessageCollector.class);
|
||||
Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
|
||||
Assert.assertNotNull(message);
|
||||
ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
|
||||
ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
|
||||
Assert.assertTrue(contains("integration.channel.errorChannel.errorRate.mean", applicationMetrics.getMetrics()));
|
||||
Assert.assertFalse(contains("mem",applicationMetrics.getMetrics()));
|
||||
Assert.assertEquals("application",applicationMetrics.getName());
|
||||
Assert.assertEquals(0,applicationMetrics.getInstanceIndex());
|
||||
Assert.assertTrue(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customAppNameAndIndex() throws Exception{
|
||||
ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.metrics.delay-millis=500",
|
||||
"--spring.application.name=foo",
|
||||
"--spring.cloud.stream.instanceIndex=1",
|
||||
"--spring.cloud.stream.bindings.streamMetrics.destination=foo");
|
||||
Emitter emitterSource = applicationContext.getBean(Emitter.class);
|
||||
MessageCollector collector = applicationContext.getBean(MessageCollector.class);
|
||||
Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
|
||||
Assert.assertNotNull(message);
|
||||
ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
|
||||
ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
|
||||
Assert.assertTrue(contains("integration.channel.errorChannel.errorRate.mean", applicationMetrics.getMetrics()));
|
||||
Assert.assertFalse(contains("mem",applicationMetrics.getMetrics()));
|
||||
Assert.assertEquals("foo",applicationMetrics.getName());
|
||||
Assert.assertEquals(1,applicationMetrics.getInstanceIndex());
|
||||
Assert.assertTrue(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void usingPrefix() throws Exception {
|
||||
ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.metrics.delay-millis=500",
|
||||
"--spring.cloud.stream.metrics.prefix=foo",
|
||||
"--spring.cloud.stream.instanceIndex=1",
|
||||
"--spring.cloud.stream.bindings.streamMetrics.destination=foo");
|
||||
Emitter emitterSource = applicationContext.getBean(Emitter.class);
|
||||
MessageCollector collector = applicationContext.getBean(MessageCollector.class);
|
||||
Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
|
||||
Assert.assertNotNull(message);
|
||||
ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
|
||||
ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
|
||||
Assert.assertTrue(contains("integration.channel.errorChannel.errorRate.mean", applicationMetrics.getMetrics()));
|
||||
Assert.assertFalse(contains("mem",applicationMetrics.getMetrics()));
|
||||
Assert.assertEquals("foo.application",applicationMetrics.getName());
|
||||
Assert.assertEquals(1,applicationMetrics.getInstanceIndex());
|
||||
Assert.assertTrue(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void includesExcludes() throws Exception {
|
||||
ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.metrics.delay-millis=500",
|
||||
"--spring.cloud.stream.bindings.streamMetrics.destination=foo",
|
||||
"--spring.cloud.stream.metrics.includes=mem**",
|
||||
"--spring.cloud.stream.metrics.excludes=integration**");
|
||||
Emitter emitterSource = applicationContext.getBean(Emitter.class);
|
||||
MessageCollector collector = applicationContext.getBean(MessageCollector.class);
|
||||
Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
|
||||
Assert.assertNotNull(message);
|
||||
ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
|
||||
ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
|
||||
Assert.assertFalse(contains("integration.channel.errorChannel.errorRate.mean", applicationMetrics.getMetrics()));
|
||||
Assert.assertTrue(contains("mem",applicationMetrics.getMetrics()));
|
||||
Assert.assertTrue(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void includesExcludesWithProperties() throws Exception {
|
||||
ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.metrics.delay-millis=500",
|
||||
"--spring.cloud.stream.bindings.streamMetrics.destination=foo",
|
||||
"--spring.cloud.stream.metrics.includes=integration**",
|
||||
"--spring.cloud.stream.metrics.properties=java**,spring.test.env**");
|
||||
Emitter emitterSource = applicationContext.getBean(Emitter.class);
|
||||
MessageCollector collector = applicationContext.getBean(MessageCollector.class);
|
||||
Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
|
||||
Assert.assertNotNull(message);
|
||||
ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
|
||||
ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
|
||||
Assert.assertFalse(contains("mem", applicationMetrics.getMetrics()));
|
||||
Assert.assertTrue(contains("integration.channel.errorChannel.errorRate.mean",applicationMetrics.getMetrics()));
|
||||
Assert.assertFalse(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
|
||||
Assert.assertTrue(applicationMetrics.getProperties().get("spring.test.env.syntax").equals("testing"));
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
private boolean contains(String metric, Collection<Metric> metrics){
|
||||
boolean contains = false;
|
||||
for(Metric entry : metrics){
|
||||
contains = entry.getName().equals(metric);
|
||||
if(contains){
|
||||
break;
|
||||
}
|
||||
}
|
||||
return contains;
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
public static class BinderExporterApplication {
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user