= Analytics Consumer
The `analytics-consumer` is a Java https://docs.oracle.com/javase/8/docs/api/java/util/function/Consumer.html[Consumer<Message<?>>] that computes analytics from the input data messages and publishes them as metrics to various monitoring systems.
It leverages the https://micrometer.io[micrometer library] for providing a uniform programming experience across the most popular https://micrometer.io/docs[monitoring systems] and uses https://docs.spring.io/spring-integration/reference/html/spel.html#spel[Spring Expression Language (SpEL)] for defining how the metric names, values and tags are computed from the input data.
The analytics-consumer can produce two metrics types:
- https://micrometer.io/docs/concepts#_counters[Counter] - reports a single metric, a count, that increments by a fixed, positive amount. Counters can be used for computing the rates of how the data changes in time.
- https://micrometer.io/docs/concepts#_gauges[Gauge] - reports the current value. Typical examples for gauges would be the size of a collection or map or number of threads in a running state.
A https://micrometer.io/docs/concepts#_meters[Meter] (e.g. Counter or Gauge) is uniquely identified by its `name` and `dimensions` (the term dimensions and tags is used interchangeably). Dimensions allow a particular named metric to be sliced to drill down and reason about the data.
NOTE: As a metrics is uniquely identified by its `name` and `dimensions`, you can assign multiple tags (e.g. key/value pairs) to every metric, but you cannot randomly change those tags afterward!
Monitoring systems such as Prometheus will complain if a metric with the same name has different sets of tags.
== Beans for injection
Add the analytics-consumer dependency to your POM:
[source,xml]
----
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>analytics-consumer</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
----
The `AnalyticsConsumerConfiguration` auto-configures the following consumer bean:
[source,java]
----
Consumer<Message<?>> analyticsConsumer
----
For every input https://docs.spring.io/spring-integration/reference/html/message.html[Message] the `analyticsConsumer` computes the defined metrics and eventually, with the help of the micrometer library, publishes them to the backend monitoring systems. The https://docs.spring.io/spring-integration/reference/html/message.html[Message] is a generic container for data. Each Message instance includes a payload and headers containing user-extensible properties as key-value pairs. Any object can be provided as the payload.
The https://docs.spring.io/spring-integration/reference/html/message.html#message-builder[MessageBuilder] helps to create a Message instance from any payload content and assign any key/value as a header:
[source,java]
----
Message<String> myMessage = MessageBuilder
.withPayload("My message text")
.setHeader("kind", "CUSTOM")
.setHeader("foo", "bar")
.build();
----
The `SpEL` expressions use the `headers` and `payload` keywords to access message’s headers and payload values. For example a counter metrics can have a value amount computed from the size of the input message payload add a `my_tag` tag, extracted from the `kind` header value:
[source,properties]
----
analytics.amount-expression=payload.lenght()
analytics.tag.expression.my_tag=headers['kind']
----
Review the https://github.com/spring-cloud/stream-applications/blob/master/functions/consumer/analytics-consumer/src/main/java/org/springframework/cloud/fn/consumer/analytics/AnalyticsConsumerProperties.java[AnalyticsConsumerProperties] javadocs for further details how to use the SpEL properties.
By default, Micrometer is packed with a `SimpleMeterRegistry` that holds the latest value of each meter in memory and doesn't export the data anywhere.
To enable support for another monitoring system you have to add the spring-boot-starter-actuator dependency and the micrometer dependency of the monitoring system of choice:
[source,xml]
----
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-[MONITORING SYSTEM NAME]</artifactId>
</dependency>
----
Follow the https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#actuator.metrics[configuration instructions] for the selected monitoring system.
All monitoring configuration properties start with a prefix: `management.metrics.export`.
== Configuration Options
All `analytics-consumer` configuration properties use the `analytics` prefix.
For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/consumer/analytics/AnalyticsConsumerProperties.java[AnalyticsConsumerProperties].
All monitoring configuration properties start with a prefix `management.metrics.export`.
=== Sample Configuration
Following examples show how to configure `counter` and `gauge` metrics over a series of stock-exchange messages like this:
[source,json]
----
{
"data": {
"symbol": "AAPL",
"exchange": "XNAS",
"open": 318.66,
"close": 316.85,
"volume": 25672211.0
}
}
----
The following configuration will create a `counter` metrics called `stockrates` with two tags: `symbol` and `exchange` computed from the json fields:
.Counter Metrics Configuration - count stock transactions
|===
|Property |Description
|`analytics.meter-type=counter`
|Counter meter type (default)
|`analytics.name=stockrates`
|Metrics name
|`analytics.tag.expression.symbol=#jsonPath(payload,'$.data.symbol')`
|Add tag `symbol` equal to the `date.symbol` field in the json messages.
|`analytics.tag.expression.exchange=#jsonPath(payload,'$.data.exchange')`
|Add tag `exchange` equal to the `date.exchange` field in the json messages.
|===
Now you can use the `stockrates` metrics to measure the rates at which the stock transactions occur over a given time interval.
Furthermore, you can aggregate those rates by the `symbol` and `exchange` tags.
To measure the transaction volumes contained in the `data.volume` JSON fields, you can build a GAUGE metrics like this:
.Gauge Metrics Configuration - compute stock volumes
|===
|Property |Description
|`analytics.meter-type=gauge`
|Gauge meter type
|`analytics.name=stockvolumes`
|Metrics name
|`analytics.tag.expression.symbol=#jsonPath(payload,'$.data.symbol')`
|Add tag `symbol` equal to the `date.symbol` field in the json messages.
|`analytics.tag.expression.exchange=#jsonPath(payload,'$.data.exchange')`
|Add tag `exchange` equal to the `date.exchange` field in the json messages.
|`analytics.tag.amount-expression=#jsonPath(payload,'$.data.volume')`
|Set the Gauge to the `data/volume` field values.
|===
Then use the `stockvolumes` metrics to graph, in real-time, the transaction volumes changes over time. You can aggregate those volumes by the `symbol` and `exchange` tags.
WARNING: Micrometer implements the Gauges for the purpose of data sampling!
There is no information about what might have occurred between two consecutive samples.
Any intermediate values set on a gauge are lost by the time the gauge value is reported to a metrics backend.
To enable one or more https://micrometer.io/docs[supported monitoring systems] you need to add a configuration like this:
.Wavefront Configuration.
|===
|Property |Description
|`management.metrics.export.wavefront.enabled=true`
|Enable or disable the monitoring system. (enabled by default).
|`management.metrics.export.wavefront.uri=YOUR_WAVEFRONT_SERVER_URI`
|UIR of your Wavefront server or Wavefront Proxy.
|`management.metrics.export.wavefront.api-token=YOUR_API_TOKEN`
|Wavefront access token.
|`management.metrics.export.wavefront.source=stock-exchange-demo`
|The `source` is used to distinct your metrics on the Wavefront server.
|===
== Tests
See this link:src/test/java/org/springframework/cloud/fn/consumer/analytics[test suite] for the various ways, this consumer is used.
== Other usage
* See the https://github.com/spring-cloud/stream-applications/blob/master/applications/sink/analytics-sink/README.adoc[Analytics Sink README] where this consumer is used to create a Spring Cloud Stream application where it makes a Counter sink.
* https://docs.google.com/document/d/1BHBjgMmg4a1ue2wr-dmPTfgaN0so4ufw2XkG541Ac9Q/edit?usp=sharing[Stock Exchange Sample].