Skip to content

Commit

Permalink
Infer types in Prometheus remote_write (#19944) (#20314)
Browse files Browse the repository at this point in the history
(cherry picked from commit b797a7e)
  • Loading branch information
ChrsMark authored Jul 29, 2020
1 parent 081f638 commit f7c59d5
Show file tree
Hide file tree
Showing 22 changed files with 1,865 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Added `performance` and `query` metricsets to `mysql` module. {pull}18955[18955]
- The `elasticsearch-xpack/index` metricset now reports hidden indices as such. {issue}18639[18639] {pull}18706[18706]
- Adds support for app insights metrics in the azure module. {issue}18570[18570] {pull}18940[18940]
- Infer types in Prometheus remote_write. {pull}19944[19944]
- Added cache and connection_errors metrics to status metricset of MySQL module {issue}16955[16955] {pull}19844[19844]
- Update MySQL dashboard with connection errors and cache metrics {pull}19913[19913] {issue}16955[16955]
- Add cloud.instance.name into aws ec2 metricset. {pull}20077[20077]
Expand Down
29 changes: 29 additions & 0 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"io/ioutil"
"net/http"
"regexp"

"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
Expand Down Expand Up @@ -284,3 +285,31 @@ func getLabels(metric *dto.Metric) common.MapStr {
}
return labels
}

// CompilePatternList compiles a pattern list and returns the list of the compiled patterns
func CompilePatternList(patterns *[]string) ([]*regexp.Regexp, error) {
var compiledPatterns []*regexp.Regexp
compiledPatterns = []*regexp.Regexp{}
if patterns != nil {
for _, pattern := range *patterns {
r, err := regexp.Compile(pattern)
if err != nil {
return nil, errors.Wrapf(err, "compiling pattern '%s'", pattern)
}
compiledPatterns = append(compiledPatterns, r)
}
return compiledPatterns, nil
}
return []*regexp.Regexp{}, nil
}

// MatchMetricFamily checks if the given family/metric name matches any of the given patterns
func MatchMetricFamily(family string, matchMetrics []*regexp.Regexp) bool {
for _, checkMetric := range matchMetrics {
matched := checkMetric.MatchString(family)
if matched {
return true
}
}
return false
}
8 changes: 4 additions & 4 deletions metricbeat/module/prometheus/collector/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ metricbeat.modules:
rate_counters: false
-------------------------------------------------------------------------------------

`use_types` paramater (default: false) enables a different layout for metrics storage, leveraging Elasticsearch
`use_types` parameter (default: false) enables a different layout for metrics storage, leveraging Elasticsearch
types, including https://www.elastic.co/guide/en/elasticsearch/reference/current/histogram.html[histograms].

`rate_counters` paramater (default: false) enables calculating a rate out of Prometheus counters. When enabled, Metricbeat stores
`rate_counters` parameter (default: false) enables calculating a rate out of Prometheus counters. When enabled, Metricbeat stores
the counter increment since the last collection. This metric should make some aggregations easier and with better
performance. This parameter can only be enabled in combination with `use_types`.

Expand Down Expand Up @@ -122,8 +122,8 @@ The configuration above will include only metrics that match `node_filesystem_*`

To keep only specific metrics, anchor the start and the end of the regexp of each metric:

- the caret ^ matches the beginning of a text or line,
- the dollar sign $ matches the end of a text.
- the caret `^` matches the beginning of a text or line,
- the dollar sign `$` matches the end of a text.

[source,yaml]
-------------------------------------------------------------------------------------
Expand Down
34 changes: 4 additions & 30 deletions metricbeat/module/prometheus/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ func MetricSetBuilder(namespace string, genFactory PromEventsGeneratorFactory) f
}
// store host here to use it as a pointer when building `up` metric
ms.host = ms.Host()
ms.excludeMetrics, err = compilePatternList(config.MetricsFilters.ExcludeMetrics)
ms.excludeMetrics, err = p.CompilePatternList(config.MetricsFilters.ExcludeMetrics)
if err != nil {
return nil, errors.Wrapf(err, "unable to compile exclude patterns")
}
ms.includeMetrics, err = compilePatternList(config.MetricsFilters.IncludeMetrics)
ms.includeMetrics, err = p.CompilePatternList(config.MetricsFilters.IncludeMetrics)
if err != nil {
return nil, errors.Wrapf(err, "unable to compile include patterns")
}
Expand Down Expand Up @@ -237,39 +237,13 @@ func (m *MetricSet) skipFamilyName(family string) bool {

// if include_metrics are defined, check if this metric should be included
if len(m.includeMetrics) > 0 {
if !matchMetricFamily(family, m.includeMetrics) {
if !p.MatchMetricFamily(family, m.includeMetrics) {
return true
}
}
// now exclude the metric if it matches any of the given patterns
if len(m.excludeMetrics) > 0 {
if matchMetricFamily(family, m.excludeMetrics) {
return true
}
}
return false
}

func compilePatternList(patterns *[]string) ([]*regexp.Regexp, error) {
var compiledPatterns []*regexp.Regexp
compiledPatterns = []*regexp.Regexp{}
if patterns != nil {
for _, pattern := range *patterns {
r, err := regexp.Compile(pattern)
if err != nil {
return nil, errors.Wrapf(err, "compiling pattern '%s'", pattern)
}
compiledPatterns = append(compiledPatterns, r)
}
return compiledPatterns, nil
}
return []*regexp.Regexp{}, nil
}

func matchMetricFamily(family string, matchMetrics []*regexp.Regexp) bool {
for _, checkMetric := range matchMetrics {
matched := checkMetric.MatchString(family)
if matched {
if p.MatchMetricFamily(family, m.excludeMetrics) {
return true
}
}
Expand Down
17 changes: 9 additions & 8 deletions metricbeat/module/prometheus/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"

_ "github.com/elastic/beats/v7/metricbeat/module/prometheus"
Expand Down Expand Up @@ -330,8 +331,8 @@ func TestSkipMetricFamily(t *testing.T) {
}

// test with no filters
ms.includeMetrics, _ = compilePatternList(&[]string{})
ms.excludeMetrics, _ = compilePatternList(&[]string{})
ms.includeMetrics, _ = p.CompilePatternList(&[]string{})
ms.excludeMetrics, _ = p.CompilePatternList(&[]string{})
metricsToKeep := 0
for _, testFamily := range testFamilies {
if !ms.skipFamily(testFamily) {
Expand All @@ -341,8 +342,8 @@ func TestSkipMetricFamily(t *testing.T) {
assert.Equal(t, metricsToKeep, len(testFamilies))

// test with only one include filter
ms.includeMetrics, _ = compilePatternList(&[]string{"http_request_duration_microseconds_a_*"})
ms.excludeMetrics, _ = compilePatternList(&[]string{})
ms.includeMetrics, _ = p.CompilePatternList(&[]string{"http_request_duration_microseconds_a_*"})
ms.excludeMetrics, _ = p.CompilePatternList(&[]string{})
metricsToKeep = 0
for _, testFamily := range testFamilies {
if !ms.skipFamily(testFamily) {
Expand All @@ -352,8 +353,8 @@ func TestSkipMetricFamily(t *testing.T) {
assert.Equal(t, metricsToKeep, 2)

// test with only one exclude filter
ms.includeMetrics, _ = compilePatternList(&[]string{""})
ms.excludeMetrics, _ = compilePatternList(&[]string{"http_request_duration_microseconds_a_*"})
ms.includeMetrics, _ = p.CompilePatternList(&[]string{""})
ms.excludeMetrics, _ = p.CompilePatternList(&[]string{"http_request_duration_microseconds_a_*"})
metricsToKeep = 0
for _, testFamily := range testFamilies {
if !ms.skipFamily(testFamily) {
Expand All @@ -363,8 +364,8 @@ func TestSkipMetricFamily(t *testing.T) {
assert.Equal(t, len(testFamilies)-2, metricsToKeep)

// test with ine include and one exclude
ms.includeMetrics, _ = compilePatternList(&[]string{"http_request_duration_microseconds_a_*"})
ms.excludeMetrics, _ = compilePatternList(&[]string{"http_request_duration_microseconds_a_b_*"})
ms.includeMetrics, _ = p.CompilePatternList(&[]string{"http_request_duration_microseconds_a_*"})
ms.excludeMetrics, _ = p.CompilePatternList(&[]string{"http_request_duration_microseconds_a_b_*"})
metricsToKeep = 0
for _, testFamily := range testFamilies {
if !ms.skipFamily(testFamily) {
Expand Down
106 changes: 106 additions & 0 deletions metricbeat/module/prometheus/remote_write/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,109 @@ remote_write:
# Disable validation of the server certificate.
#insecure_skip_verify: true
------------------------------------------------------------------------------


[float]
[role="xpack"]
=== Histograms and types

beta[]

[source,yaml]
-------------------------------------------------------------------------------------
metricbeat.modules:
- module: prometheus
metricsets: ["remote_write"]
host: "localhost"
port: "9201"
-------------------------------------------------------------------------------------

`use_types` parameter (default: false) enables a different layout for metrics storage, leveraging Elasticsearch
types, including https://www.elastic.co/guide/en/elasticsearch/reference/current/histogram.html[histograms].

`rate_counters` parameter (default: false) enables calculating a rate out of Prometheus counters. When enabled, Metricbeat stores
the counter increment since the last collection. This metric should make some aggregations easier and with better
performance. This parameter can only be enabled in combination with `use_types`.

When `use_types` and `rate_counters` are enabled, metrics are stored like this:

[source,json]
----
{
"prometheus": {
"labels": {
"instance": "172.27.0.2:9090",
"job": "prometheus"
},
"prometheus_target_interval_length_seconds_count": {
"counter": 1,
"rate": 0
},
"prometheus_target_interval_length_seconds_sum": {
"counter": 15.000401344,
"rate": 0
}
"prometheus_tsdb_compaction_chunk_range_seconds_bucket": {
"histogram": {
"values": [50, 300, 1000, 4000, 16000],
"counts": [10, 2, 34, 7]
}
}
},
}
----


[float]
==== Types' patterns

Unlike `collector` metricset, `remote_write` receives metrics in raw format from the prometheus server.
In this, the module has to internally use a heuristic in order to identify efficiently the type of each raw metric.
For these purpose some name patterns are used in order to identify the type of each metric.
The default patterns are the following:

. `_total` suffix: the metric is of Counter type
. `_sum` suffix: the metric is of Counter type
. `_count` suffix: the metric is of Counter type
. `_bucket` suffix and `le` in labels: the metric is of Histogram type

Everything else is handled as a Gauge. In addition there is no special handling for Summaries so it is expected that
Summary's quantiles are handled as Gauges and Summary's sum and count as Counters.

Users have the flexibility to add their own patterns using the following configuration:

[source,yaml]
-------------------------------------------------------------------------------------
metricbeat.modules:
- module: prometheus
metricsets: ["remote_write"]
host: "localhost"
port: "9201"
types_patterns:
counter_patterns: ["_my_counter_suffix"]
histogram_patterns: ["_my_histogram_suffix"]
-------------------------------------------------------------------------------------

The configuration above will consider metrics with names that match `_my_counter_suffix` as Counters
and those that match `_my_histogram_suffix` (and have `le` in their labels) as Histograms.


To match only specific metrics, anchor the start and the end of the regexp of each metric:

- the caret `^` matches the beginning of a text or line,
- the dollar sign `$` matches the end of a text.

[source,yaml]
-------------------------------------------------------------------------------------
metricbeat.modules:
- module: prometheus
metricsets: ["remote_write"]
host: "localhost"
port: "9201"
types_patterns:
histogram_patterns: ["^my_histogram_metric$"]
-------------------------------------------------------------------------------------

Note that when using `types_patterns`, the provided patterns have higher priority than the default patterns.
For instance if `_histogram_total` is a defined histogram pattern, then a metric like `network_bytes_histogram_total`
will be handled as a histogram even of it has the suffix `_total` which is a default pattern for counters.
56 changes: 34 additions & 22 deletions metricbeat/module/prometheus/remote_write/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,17 @@ import (
"github.com/elastic/beats/v7/metricbeat/mb"
)

func samplesToEvents(metrics model.Samples) map[string]mb.Event {
// DefaultRemoteWriteEventsGeneratorFactory returns the default prometheus events generator
func DefaultRemoteWriteEventsGeneratorFactory(ms mb.BaseMetricSet) (RemoteWriteEventsGenerator, error) {
return &remoteWriteEventGenerator{}, nil
}

type remoteWriteEventGenerator struct{}

func (p *remoteWriteEventGenerator) Start() {}
func (p *remoteWriteEventGenerator) Stop() {}

func (p *remoteWriteEventGenerator) GenerateEvents(metrics model.Samples) map[string]mb.Event {
eventList := map[string]mb.Event{}

for _, metric := range metrics {
Expand All @@ -35,38 +45,40 @@ func samplesToEvents(metrics model.Samples) map[string]mb.Event {
if metric == nil {
continue
}
val := float64(metric.Value)
if math.IsNaN(val) || math.IsInf(val, 0) {
continue
}

name := string(metric.Metric["__name__"])
delete(metric.Metric, "__name__")

for k, v := range metric.Metric {
labels[string(k)] = v
}

val := float64(metric.Value)
if !math.IsNaN(val) && !math.IsInf(val, 0) {
// join metrics with same labels in a single event
labelsHash := labels.String()
if _, ok := eventList[labelsHash]; !ok {
eventList[labelsHash] = mb.Event{
ModuleFields: common.MapStr{
"metrics": common.MapStr{},
},
}

// Add labels
if len(labels) > 0 {
eventList[labelsHash].ModuleFields["labels"] = labels
}
// join metrics with same labels in a single event
labelsHash := labels.String()
if _, ok := eventList[labelsHash]; !ok {
eventList[labelsHash] = mb.Event{
ModuleFields: common.MapStr{
"metrics": common.MapStr{},
},
}

// Not checking anything here because we create these maps some lines before
e := eventList[labelsHash]
e.Timestamp = metric.Timestamp.Time()
data := common.MapStr{
name: val,
// Add labels
if len(labels) > 0 {
eventList[labelsHash].ModuleFields["labels"] = labels
}
e.ModuleFields["metrics"].(common.MapStr).Update(data)
}

// Not checking anything here because we create these maps some lines before
e := eventList[labelsHash]
e.Timestamp = metric.Timestamp.Time()
data := common.MapStr{
name: val,
}
e.ModuleFields["metrics"].(common.MapStr).Update(data)
}

return eventList
Expand Down
Loading

0 comments on commit f7c59d5

Please sign in to comment.