diff --git a/pom.xml b/pom.xml
index d0b2e159a..a5fd6ea85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,7 @@
spring-cloud-stream-schema
spring-cloud-stream-schema-server
spring-cloud-stream-tools
+ spring-cloud-stream-metrics
diff --git a/spring-cloud-stream-core-docs/src/main/asciidoc/index.adoc b/spring-cloud-stream-core-docs/src/main/asciidoc/index.adoc
index fbdf9f590..d7e739ad9 100644
--- a/spring-cloud-stream-core-docs/src/main/asciidoc/index.adoc
+++ b/spring-cloud-stream-core-docs/src/main/asciidoc/index.adoc
@@ -1,6 +1,6 @@
[[spring-cloud-stream-reference]]
= Spring Cloud Stream Reference Guide
-Sabby Anandan; Marius Bogoevici; Eric Bottard; Mark Fisher; Ilayaperumal Gopinathan; Gunnar Hillert; Mark Pollack; Patrick Peralta; Glenn Renfro; Thomas Risberg; Dave Syer; David Turanski; Janne Valkealahti; Benjamin Klein
+Sabby Anandan; Marius Bogoevici; Eric Bottard; Mark Fisher; Ilayaperumal Gopinathan; Gunnar Hillert; Mark Pollack; Patrick Peralta; Glenn Renfro; Thomas Risberg; Dave Syer; David Turanski; Janne Valkealahti; Benjamin Klein; Vinicius Carvalho
:doctype: book
:toc:
:toclevels: 4
diff --git a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc
index 134315b54..9dca51bdd 100644
--- a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc
+++ b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc
@@ -1813,6 +1813,50 @@ Once we have received the message, we can validate that the component functions
Spring Cloud Stream provides a health indicator for binders.
It is registered under the name of `binders` and can be enabled or disabled by setting the `management.health.binders.enabled` property.
+== Metrics Emitter
+Spring Cloud Stream provides a module called `spring-cloud-stream-metrics` that can be used to emit any available metric from https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-metrics.html[Spring Boot metrics endpoint] to a named channel.
+This module allow operators to collect metrics from stream applications without relying on polling their endpoints.
+
+HTTP polling can be challenging on cloud environments since applications could be on a private network or behind a Load balancer that prevents per instance access.
+
+This module is included on the each of the out of box https://github.com/spring-cloud-stream-app-starters[application starters], if you want to emit metrics from your application, just import it on your pom.xml and follow the activation instructions.
+
+The module is activated when you set the destination name for its channel, `spring.cloud.stream.bindings.streamMetrics.destination=`.
+
+By default the module is configured to only send Spring Integration message channel metrics.
+
+Available properties for customization using the prefix `spring.cloud.stream.metrics`.
+
+delay-millis::
+ The period in which metrics will be posted to the channel
++
+Default: `5000`.
++
+prefix::
+ Prefix string to be prepended to the metrics name
++
+Default: ``
++
+includes::
+ An array of strings containing regex patterns of metrics that should be included.
++
+Default: `integration**`
++
+excludes::
+ An array of strings containing regex patterns of metrics that should be excluded.
++
+Default: ``
++
+properties::
+ Just like the `includes` option, it allows white listing application properties that will be added to the metrics payload
++
+Default: null.
++
+spring.cloud.stream.bindings.streamMetrics.contentType::
+ Content-Type of the message
++
+Default: `application/json`
+
== Samples
For Spring Cloud Stream samples, please refer to the https://github.com/spring-cloud/spring-cloud-stream-samples[spring-cloud-stream-samples] repository on GitHub.
diff --git a/spring-cloud-stream-metrics/pom.xml b/spring-cloud-stream-metrics/pom.xml
new file mode 100644
index 000000000..037b3410f
--- /dev/null
+++ b/spring-cloud-stream-metrics/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+ 4.0.0
+
+ org.springframework.cloud
+ spring-cloud-stream-metrics
+ Emitter module to publish boot metrics
+
+ org.springframework.cloud
+ spring-cloud-stream-parent
+ 1.2.0.BUILD-SNAPSHOT
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-stream-test-support
+ test
+
+
+
+
diff --git a/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/config/metrics/BinderMetricsAutoConfiguration.java b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/config/metrics/BinderMetricsAutoConfiguration.java
new file mode 100644
index 000000000..e8dcea219
--- /dev/null
+++ b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/config/metrics/BinderMetricsAutoConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.config.metrics;
+
+import org.springframework.boot.actuate.endpoint.MetricsEndpoint;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.binder.Binder;
+import org.springframework.cloud.stream.metrics.BinderMetricsEmitter;
+import org.springframework.cloud.stream.metrics.BootMetricJsonSerializer;
+import org.springframework.cloud.stream.metrics.Emitter;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+/**
+ * @author Vinicius Carvalho
+ */
+@Configuration
+@ConditionalOnClass(Binder.class)
+@EnableScheduling
+@EnableBinding(Emitter.class)
+@EnableConfigurationProperties(StreamMetricsProperties.class)
+@ConditionalOnProperty("spring.cloud.stream.bindings."+ Emitter.METRICS_CHANNEL_NAME + ".destination")
+public class BinderMetricsAutoConfiguration {
+
+ @Bean
+ public BinderMetricsEmitter binderMetricsExporter(MetricsEndpoint endpoint) {
+ return new BinderMetricsEmitter(endpoint);
+ }
+
+ @Bean
+ public BootMetricJsonSerializer metricJsonSerializer() {
+ return new BootMetricJsonSerializer();
+ }
+
+}
diff --git a/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/config/metrics/BinderMetricsEnvironmentPostProcessor.java b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/config/metrics/BinderMetricsEnvironmentPostProcessor.java
new file mode 100644
index 000000000..4424f0a2d
--- /dev/null
+++ b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/config/metrics/BinderMetricsEnvironmentPostProcessor.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.config.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.env.EnvironmentPostProcessor;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.env.MapPropertySource;
+
+/**
+ * @author Vinicius Carvalho
+ */
+public class BinderMetricsEnvironmentPostProcessor implements EnvironmentPostProcessor {
+ @Override
+ public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
+ Map propertiesToAdd = new HashMap<>();
+ propertiesToAdd.put("spring.cloud.stream.bindings.streamMetrics.contentType","application/json");
+ environment.getPropertySources().addLast(new MapPropertySource("binderMetricsDefaultProperties",propertiesToAdd));
+ }
+}
diff --git a/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/config/metrics/StreamMetricsProperties.java b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/config/metrics/StreamMetricsProperties.java
new file mode 100644
index 000000000..a320707b8
--- /dev/null
+++ b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/config/metrics/StreamMetricsProperties.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.config.metrics;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.util.StringUtils;
+
+/**
+ * @author Vinicius Carvalho
+ */
+@ConfigurationProperties(prefix = "spring.cloud.stream.metrics")
+public class StreamMetricsProperties {
+
+
+ private String prefix;
+
+ @Value("${spring.application.name:${vcap.application.name:${spring.config.name:application}}}")
+ private String applicationName;
+
+ private String metricName;
+
+ private String[] includes = new String[]{"integration**"};
+
+ private String[] excludes;
+
+ private String[] properties;
+
+ private Long delayMillis;
+
+ public String[] getIncludes() {
+ return includes;
+ }
+
+ public void setIncludes(String[] includes) {
+ this.includes = includes;
+ }
+
+ public String[] getExcludes() {
+ return excludes;
+ }
+
+ public void setExcludes(String[] excludes) {
+ this.excludes = excludes;
+ }
+
+ public Long getDelayMillis() {
+ return delayMillis;
+ }
+
+ public void setDelayMillis(Long delayMillis) {
+ this.delayMillis = delayMillis;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ public void setPrefix(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public String getApplicationName() {
+ return applicationName;
+ }
+
+ public String[] getProperties() {
+ return properties;
+ }
+
+ public void setProperties(String[] properties) {
+ this.properties = properties;
+ }
+
+ public String getMetricName() {
+ if(this.metricName == null){
+ this.metricName = resolveMetricName();
+ }
+ return metricName;
+ }
+
+ private String resolveMetricName(){
+ StringBuffer name = new StringBuffer(this.applicationName);
+ if(!StringUtils.isEmpty(this.prefix)){
+ String prefix = this.prefix;
+ if(prefix.lastIndexOf(".") == -1){
+ prefix += ".";
+ }
+ name.insert(0,prefix);
+ }
+ return name.toString();
+ }
+
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+}
diff --git a/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/ApplicationMetrics.java b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/ApplicationMetrics.java
new file mode 100644
index 000000000..38782e4f2
--- /dev/null
+++ b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/ApplicationMetrics.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.metrics;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.springframework.boot.actuate.metrics.Metric;
+
+/**
+ * @author Vinicius Carvalho
+ */
+public class ApplicationMetrics {
+
+ private String name;
+
+ private int instanceIndex;
+
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC")
+ private final Date createdTime;
+
+ private Collection metrics;
+
+ private Map properties;
+
+
+ @JsonCreator
+ public ApplicationMetrics(@JsonProperty("name") String name,
+ @JsonProperty("instanceIndex") int instanceIndex,
+ @JsonProperty("metrics") Collection metrics) {
+ this.name = name;
+ this.instanceIndex = instanceIndex;
+ this.metrics = metrics;
+ this.createdTime = new Date();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getInstanceIndex() {
+ return instanceIndex;
+ }
+
+ public void setInstanceIndex(int instanceIndex) {
+ this.instanceIndex = instanceIndex;
+ }
+
+ public Collection getMetrics() {
+ return metrics;
+ }
+
+ public void setMetrics(Collection metrics) {
+ this.metrics = metrics;
+ }
+
+ public Date getCreatedTime() {
+ return createdTime;
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map properties) {
+ this.properties = properties;
+ }
+}
diff --git a/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/BinderMetricsEmitter.java b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/BinderMetricsEmitter.java
new file mode 100644
index 000000000..942866eeb
--- /dev/null
+++ b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/BinderMetricsEmitter.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.metrics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.actuate.endpoint.MetricsEndpoint;
+import org.springframework.boot.actuate.endpoint.MetricsEndpointMetricReader;
+import org.springframework.boot.actuate.metrics.Metric;
+import org.springframework.boot.actuate.metrics.export.MetricCopyExporter;
+import org.springframework.boot.bind.RelaxedNames;
+import org.springframework.cloud.stream.config.BindingServiceProperties;
+import org.springframework.cloud.stream.config.metrics.StreamMetricsProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.core.env.EnumerablePropertySource;
+import org.springframework.core.env.PropertySource;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.PatternMatchUtils;
+
+/**
+ * @author Vinicius Carvalho
+ *
+ * Component that sends metrics from {@link MetricsEndpointMetricReader} downstream via the configured metrics channel.
+ *
+ * It uses {@link Scheduled} support to periodially emit messages polled from the endpoint.
+ */
+public class BinderMetricsEmitter implements ApplicationListener, ApplicationContextAware {
+
+ @Autowired
+ private Emitter source;
+
+ @Autowired
+ private StreamMetricsProperties properties;
+
+ @Autowired
+ private BindingServiceProperties bindingServiceProperties;
+
+ private MetricsEndpointMetricReader metricsReader;
+
+ private ApplicationContext applicationContext;
+
+ /**
+ * List of properties that are going to be appended to each message.
+ * This gets populate by onApplicationEvent, once the context refreshes to avoid overhead of doing per message basis.
+ */
+ private Map whitelistedProperties;
+
+ public BinderMetricsEmitter(MetricsEndpoint endpoint){
+ this.metricsReader = new MetricsEndpointMetricReader(endpoint);
+ this.whitelistedProperties = new HashMap<>();
+ }
+
+ @Scheduled(fixedRateString = "${spring.cloud.stream.metrics.delay-millis:5000}")
+ public void sendMetrics(){
+ ApplicationMetrics appMetrics = new ApplicationMetrics(this.properties.getMetricName(),
+ this.bindingServiceProperties.getInstanceIndex(),
+ filter());
+ appMetrics.setProperties(whitelistedProperties);
+ source.metrics().send(MessageBuilder.withPayload(appMetrics).build());
+ }
+
+ /**
+ * Shameless copied from {@link MetricCopyExporter}
+ * @return
+ */
+ protected Collection filter(){
+ Collection result = new ArrayList<>();
+ Iterable> metrics = metricsReader.findAll();
+ for(Metric metric : metrics){
+ if(isMatch(metric.getName(),this.properties.getIncludes(),this.properties.getExcludes())){
+ result.add(metric);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Shameless copied from {@link MetricCopyExporter}
+ * @return
+ */
+ private boolean isMatch(String name, String[] includes, String[] excludes) {
+ if (ObjectUtils.isEmpty(includes)
+ || PatternMatchUtils.simpleMatch(includes, name)) {
+ return !PatternMatchUtils.simpleMatch(excludes, name);
+ }
+ return false;
+ }
+
+ @Override
+ /**
+ * Iterates over all property sources from this application context and copies the ones listed in {@link StreamMetricsProperties} includes
+ */
+ public void onApplicationEvent(ContextRefreshedEvent event) {
+ ConfigurableApplicationContext ctx = (ConfigurableApplicationContext) event.getSource();
+ if (!ObjectUtils.isEmpty(this.properties.getProperties())) {
+ for (PropertySource source : ctx.getEnvironment().getPropertySources()) {
+ if (source instanceof EnumerablePropertySource) {
+ EnumerablePropertySource e = (EnumerablePropertySource) source;
+ for (String propertyName : e.getPropertyNames()) {
+ for (String relaxedPropertyName : new RelaxedNames(propertyName)) {
+ if (isMatch(relaxedPropertyName, this.properties.getProperties(), null)) {
+ whitelistedProperties.put(relaxedPropertyName, source.getProperty(propertyName));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+}
diff --git a/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/BootMetricJsonSerializer.java b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/BootMetricJsonSerializer.java
new file mode 100644
index 000000000..c3e787859
--- /dev/null
+++ b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/BootMetricJsonSerializer.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.metrics;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import org.springframework.boot.actuate.metrics.Metric;
+import org.springframework.boot.jackson.JsonComponent;
+
+/**
+ * @author Vinicius Carvalho
+ */
+@JsonComponent
+public class BootMetricJsonSerializer {
+
+ final static DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+ public static class Serializer extends JsonSerializer {
+
+ @Override
+ public void serialize(Metric metric, JsonGenerator json, SerializerProvider serializerProvider) throws IOException {
+ json.writeStartObject();
+ json.writeStringField("name",metric.getName());
+ json.writeNumberField("value",metric.getValue().doubleValue());
+ json.writeStringField("timestamp",df.format(metric.getTimestamp()));
+ json.writeEndObject();
+ }
+ }
+
+ public static class Deserializer extends JsonDeserializer {
+
+
+ @Override
+ public Metric deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+ JsonNode node = p.getCodec().readTree(p);
+ String name = node.get("name").asText();
+ Number value = node.get("value").asDouble();
+ Date timestamp = null;
+
+ try {
+ timestamp = df.parse(node.get("timestamp").asText());
+ }
+ catch (ParseException e) {
+ }
+ Metric metric = new Metric(name,value,timestamp);
+
+ return metric;
+ }
+
+ }
+}
diff --git a/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/Emitter.java b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/Emitter.java
new file mode 100644
index 000000000..cc19c1614
--- /dev/null
+++ b/spring-cloud-stream-metrics/src/main/java/org/springframework/cloud/stream/metrics/Emitter.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.metrics;
+
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+
+/**
+ * @author Vinicius Carvalho
+ */
+public interface Emitter {
+
+ String METRICS_CHANNEL_NAME = "streamMetrics";
+
+ @Output(METRICS_CHANNEL_NAME)
+ MessageChannel metrics();
+
+}
diff --git a/spring-cloud-stream-metrics/src/main/resources/META-INF/spring.factories b/spring-cloud-stream-metrics/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..2555b7f54
--- /dev/null
+++ b/spring-cloud-stream-metrics/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,4 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+ org.springframework.cloud.stream.config.metrics.BinderMetricsAutoConfiguration
+org.springframework.boot.env.EnvironmentPostProcessor=\
+ org.springframework.cloud.stream.config.metrics.BinderMetricsEnvironmentPostProcessor
\ No newline at end of file
diff --git a/spring-cloud-stream-metrics/src/test/java/org/springframework/cloud/stream/metrics/BinderMetricsEmitterTests.java b/spring-cloud-stream-metrics/src/test/java/org/springframework/cloud/stream/metrics/BinderMetricsEmitterTests.java
new file mode 100644
index 000000000..25bf92177
--- /dev/null
+++ b/spring-cloud-stream-metrics/src/test/java/org/springframework/cloud/stream/metrics/BinderMetricsEmitterTests.java
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.metrics;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.actuate.metrics.Metric;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.messaging.Message;
+import org.springframework.util.CollectionUtils;
+
+/**
+ * @author Vinicius Carvalho
+ */
+public class BinderMetricsEmitterTests {
+
+
+
+ @BeforeClass
+ public static void setSystemProps() {
+ System.setProperty("SPRING_TEST_ENV_SYNTAX","testing");
+ }
+
+ @AfterClass
+ public static void unsetSystemProps() {
+ System.clearProperty("SPRING_TEST_ENV_SYNTAX");
+ }
+
+
+ @Test(expected = NoSuchBeanDefinitionException.class)
+ public void checkDisabledConfiguration() throws Exception {
+ ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
+ "--server.port=0",
+ "--spring.jmx.enabled=false",
+ "--spring.cloud.stream.metrics.delay-millis=500"
+ );
+ try {
+ applicationContext.getBean(Emitter.class);
+ } catch (Exception e){
+ throw e;
+ }
+ finally {
+ applicationContext.close();
+ }
+
+ }
+
+ @Test
+ public void defaultIncludes() throws Exception {
+ ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
+ "--server.port=0",
+ "--spring.jmx.enabled=false",
+ "--spring.cloud.stream.metrics.delay-millis=500",
+ "--spring.cloud.stream.bindings.streamMetrics.destination=foo");
+ Emitter emitterSource = applicationContext.getBean(Emitter.class);
+ MessageCollector collector = applicationContext.getBean(MessageCollector.class);
+ Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(message);
+ ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
+ ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
+ Assert.assertTrue(contains("integration.channel.errorChannel.errorRate.mean", applicationMetrics.getMetrics()));
+ Assert.assertFalse(contains("mem",applicationMetrics.getMetrics()));
+ Assert.assertEquals("application",applicationMetrics.getName());
+ Assert.assertEquals(0,applicationMetrics.getInstanceIndex());
+ Assert.assertTrue(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
+ applicationContext.close();
+ }
+
+ @Test
+ public void customAppNameAndIndex() throws Exception{
+ ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
+ "--server.port=0",
+ "--spring.jmx.enabled=false",
+ "--spring.cloud.stream.metrics.delay-millis=500",
+ "--spring.application.name=foo",
+ "--spring.cloud.stream.instanceIndex=1",
+ "--spring.cloud.stream.bindings.streamMetrics.destination=foo");
+ Emitter emitterSource = applicationContext.getBean(Emitter.class);
+ MessageCollector collector = applicationContext.getBean(MessageCollector.class);
+ Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(message);
+ ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
+ ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
+ Assert.assertTrue(contains("integration.channel.errorChannel.errorRate.mean", applicationMetrics.getMetrics()));
+ Assert.assertFalse(contains("mem",applicationMetrics.getMetrics()));
+ Assert.assertEquals("foo",applicationMetrics.getName());
+ Assert.assertEquals(1,applicationMetrics.getInstanceIndex());
+ Assert.assertTrue(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
+ applicationContext.close();
+ }
+
+ @Test
+ public void usingPrefix() throws Exception {
+ ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
+ "--server.port=0",
+ "--spring.jmx.enabled=false",
+ "--spring.cloud.stream.metrics.delay-millis=500",
+ "--spring.cloud.stream.metrics.prefix=foo",
+ "--spring.cloud.stream.instanceIndex=1",
+ "--spring.cloud.stream.bindings.streamMetrics.destination=foo");
+ Emitter emitterSource = applicationContext.getBean(Emitter.class);
+ MessageCollector collector = applicationContext.getBean(MessageCollector.class);
+ Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(message);
+ ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
+ ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
+ Assert.assertTrue(contains("integration.channel.errorChannel.errorRate.mean", applicationMetrics.getMetrics()));
+ Assert.assertFalse(contains("mem",applicationMetrics.getMetrics()));
+ Assert.assertEquals("foo.application",applicationMetrics.getName());
+ Assert.assertEquals(1,applicationMetrics.getInstanceIndex());
+ Assert.assertTrue(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
+ applicationContext.close();
+ }
+
+ @Test
+ public void includesExcludes() throws Exception {
+ ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
+ "--server.port=0",
+ "--spring.jmx.enabled=false",
+ "--spring.cloud.stream.metrics.delay-millis=500",
+ "--spring.cloud.stream.bindings.streamMetrics.destination=foo",
+ "--spring.cloud.stream.metrics.includes=mem**",
+ "--spring.cloud.stream.metrics.excludes=integration**");
+ Emitter emitterSource = applicationContext.getBean(Emitter.class);
+ MessageCollector collector = applicationContext.getBean(MessageCollector.class);
+ Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(message);
+ ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
+ ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
+ Assert.assertFalse(contains("integration.channel.errorChannel.errorRate.mean", applicationMetrics.getMetrics()));
+ Assert.assertTrue(contains("mem",applicationMetrics.getMetrics()));
+ Assert.assertTrue(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
+ applicationContext.close();
+ }
+
+ @Test
+ public void includesExcludesWithProperties() throws Exception {
+ ConfigurableApplicationContext applicationContext = SpringApplication.run(BinderExporterApplication.class,
+ "--server.port=0",
+ "--spring.jmx.enabled=false",
+ "--spring.cloud.stream.metrics.delay-millis=500",
+ "--spring.cloud.stream.bindings.streamMetrics.destination=foo",
+ "--spring.cloud.stream.metrics.includes=integration**",
+ "--spring.cloud.stream.metrics.properties=java**,spring.test.env**");
+ Emitter emitterSource = applicationContext.getBean(Emitter.class);
+ MessageCollector collector = applicationContext.getBean(MessageCollector.class);
+ Message message = collector.forChannel(emitterSource.metrics()).poll(1000, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(message);
+ ObjectMapper mapper = applicationContext.getBean(ObjectMapper.class);
+ ApplicationMetrics applicationMetrics = mapper.readValue((String)message.getPayload(),ApplicationMetrics.class);
+ Assert.assertFalse(contains("mem", applicationMetrics.getMetrics()));
+ Assert.assertTrue(contains("integration.channel.errorChannel.errorRate.mean",applicationMetrics.getMetrics()));
+ Assert.assertFalse(CollectionUtils.isEmpty(applicationMetrics.getProperties()));
+ Assert.assertTrue(applicationMetrics.getProperties().get("spring.test.env.syntax").equals("testing"));
+ applicationContext.close();
+ }
+
+ private boolean contains(String metric, Collection metrics){
+ boolean contains = false;
+ for(Metric entry : metrics){
+ contains = entry.getName().equals(metric);
+ if(contains){
+ break;
+ }
+ }
+ return contains;
+ }
+
+ @EnableAutoConfiguration
+ public static class BinderExporterApplication {
+
+ }
+}