diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a46e58ce92e..327e6aaa8ce 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -600,6 +600,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added `performance` and `query` metricsets to `mysql` module. {pull}18955[18955] - The `elasticsearch-xpack/index` metricset now reports hidden indices as such. {issue}18639[18639] {pull}18706[18706] - Adds support for app insights metrics in the azure module. {issue}18570[18570] {pull}18940[18940] +- Infer types in Prometheus remote_write. {pull}19944[19944] - Added cache and connection_errors metrics to status metricset of MySQL module {issue}16955[16955] {pull}19844[19844] - Update MySQL dashboard with connection errors and cache metrics {pull}19913[19913] {issue}16955[16955] - Add cloud.instance.name into aws ec2 metricset. {pull}20077[20077] diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index 2859178d98f..c520460109d 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -22,6 +22,7 @@ import ( "io" "io/ioutil" "net/http" + "regexp" "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" @@ -284,3 +285,31 @@ func getLabels(metric *dto.Metric) common.MapStr { } return labels } + +// CompilePatternList compiles a pattern list and returns the list of the compiled patterns +func CompilePatternList(patterns *[]string) ([]*regexp.Regexp, error) { + var compiledPatterns []*regexp.Regexp + compiledPatterns = []*regexp.Regexp{} + if patterns != nil { + for _, pattern := range *patterns { + r, err := regexp.Compile(pattern) + if err != nil { + return nil, errors.Wrapf(err, "compiling pattern '%s'", pattern) + } + compiledPatterns = append(compiledPatterns, r) + } + return compiledPatterns, nil + } + return []*regexp.Regexp{}, nil +} + +// MatchMetricFamily checks if the given family/metric name matches any of the given patterns +func MatchMetricFamily(family string, matchMetrics []*regexp.Regexp) bool { + for _, checkMetric := range matchMetrics { + matched := checkMetric.MatchString(family) + if matched { + return true + } + } + return false +} diff --git a/metricbeat/module/prometheus/collector/_meta/docs.asciidoc b/metricbeat/module/prometheus/collector/_meta/docs.asciidoc index c3609b083dd..022b6172b61 100644 --- a/metricbeat/module/prometheus/collector/_meta/docs.asciidoc +++ b/metricbeat/module/prometheus/collector/_meta/docs.asciidoc @@ -40,10 +40,10 @@ metricbeat.modules: rate_counters: false ------------------------------------------------------------------------------------- -`use_types` paramater (default: false) enables a different layout for metrics storage, leveraging Elasticsearch +`use_types` parameter (default: false) enables a different layout for metrics storage, leveraging Elasticsearch types, including https://www.elastic.co/guide/en/elasticsearch/reference/current/histogram.html[histograms]. -`rate_counters` paramater (default: false) enables calculating a rate out of Prometheus counters. When enabled, Metricbeat stores +`rate_counters` parameter (default: false) enables calculating a rate out of Prometheus counters. When enabled, Metricbeat stores the counter increment since the last collection. This metric should make some aggregations easier and with better performance. This parameter can only be enabled in combination with `use_types`. @@ -122,8 +122,8 @@ The configuration above will include only metrics that match `node_filesystem_*` To keep only specific metrics, anchor the start and the end of the regexp of each metric: -- the caret ^ matches the beginning of a text or line, -- the dollar sign $ matches the end of a text. +- the caret `^` matches the beginning of a text or line, +- the dollar sign `$` matches the end of a text. [source,yaml] ------------------------------------------------------------------------------------- diff --git a/metricbeat/module/prometheus/collector/collector.go b/metricbeat/module/prometheus/collector/collector.go index 6941f30bd8a..ce3cee8cb60 100644 --- a/metricbeat/module/prometheus/collector/collector.go +++ b/metricbeat/module/prometheus/collector/collector.go @@ -111,11 +111,11 @@ func MetricSetBuilder(namespace string, genFactory PromEventsGeneratorFactory) f } // store host here to use it as a pointer when building `up` metric ms.host = ms.Host() - ms.excludeMetrics, err = compilePatternList(config.MetricsFilters.ExcludeMetrics) + ms.excludeMetrics, err = p.CompilePatternList(config.MetricsFilters.ExcludeMetrics) if err != nil { return nil, errors.Wrapf(err, "unable to compile exclude patterns") } - ms.includeMetrics, err = compilePatternList(config.MetricsFilters.IncludeMetrics) + ms.includeMetrics, err = p.CompilePatternList(config.MetricsFilters.IncludeMetrics) if err != nil { return nil, errors.Wrapf(err, "unable to compile include patterns") } @@ -237,39 +237,13 @@ func (m *MetricSet) skipFamilyName(family string) bool { // if include_metrics are defined, check if this metric should be included if len(m.includeMetrics) > 0 { - if !matchMetricFamily(family, m.includeMetrics) { + if !p.MatchMetricFamily(family, m.includeMetrics) { return true } } // now exclude the metric if it matches any of the given patterns if len(m.excludeMetrics) > 0 { - if matchMetricFamily(family, m.excludeMetrics) { - return true - } - } - return false -} - -func compilePatternList(patterns *[]string) ([]*regexp.Regexp, error) { - var compiledPatterns []*regexp.Regexp - compiledPatterns = []*regexp.Regexp{} - if patterns != nil { - for _, pattern := range *patterns { - r, err := regexp.Compile(pattern) - if err != nil { - return nil, errors.Wrapf(err, "compiling pattern '%s'", pattern) - } - compiledPatterns = append(compiledPatterns, r) - } - return compiledPatterns, nil - } - return []*regexp.Regexp{}, nil -} - -func matchMetricFamily(family string, matchMetrics []*regexp.Regexp) bool { - for _, checkMetric := range matchMetrics { - matched := checkMetric.MatchString(family) - if matched { + if p.MatchMetricFamily(family, m.excludeMetrics) { return true } } diff --git a/metricbeat/module/prometheus/collector/collector_test.go b/metricbeat/module/prometheus/collector/collector_test.go index 94477a0aa2b..541b83b8f83 100644 --- a/metricbeat/module/prometheus/collector/collector_test.go +++ b/metricbeat/module/prometheus/collector/collector_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/common" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" _ "github.com/elastic/beats/v7/metricbeat/module/prometheus" @@ -330,8 +331,8 @@ func TestSkipMetricFamily(t *testing.T) { } // test with no filters - ms.includeMetrics, _ = compilePatternList(&[]string{}) - ms.excludeMetrics, _ = compilePatternList(&[]string{}) + ms.includeMetrics, _ = p.CompilePatternList(&[]string{}) + ms.excludeMetrics, _ = p.CompilePatternList(&[]string{}) metricsToKeep := 0 for _, testFamily := range testFamilies { if !ms.skipFamily(testFamily) { @@ -341,8 +342,8 @@ func TestSkipMetricFamily(t *testing.T) { assert.Equal(t, metricsToKeep, len(testFamilies)) // test with only one include filter - ms.includeMetrics, _ = compilePatternList(&[]string{"http_request_duration_microseconds_a_*"}) - ms.excludeMetrics, _ = compilePatternList(&[]string{}) + ms.includeMetrics, _ = p.CompilePatternList(&[]string{"http_request_duration_microseconds_a_*"}) + ms.excludeMetrics, _ = p.CompilePatternList(&[]string{}) metricsToKeep = 0 for _, testFamily := range testFamilies { if !ms.skipFamily(testFamily) { @@ -352,8 +353,8 @@ func TestSkipMetricFamily(t *testing.T) { assert.Equal(t, metricsToKeep, 2) // test with only one exclude filter - ms.includeMetrics, _ = compilePatternList(&[]string{""}) - ms.excludeMetrics, _ = compilePatternList(&[]string{"http_request_duration_microseconds_a_*"}) + ms.includeMetrics, _ = p.CompilePatternList(&[]string{""}) + ms.excludeMetrics, _ = p.CompilePatternList(&[]string{"http_request_duration_microseconds_a_*"}) metricsToKeep = 0 for _, testFamily := range testFamilies { if !ms.skipFamily(testFamily) { @@ -363,8 +364,8 @@ func TestSkipMetricFamily(t *testing.T) { assert.Equal(t, len(testFamilies)-2, metricsToKeep) // test with ine include and one exclude - ms.includeMetrics, _ = compilePatternList(&[]string{"http_request_duration_microseconds_a_*"}) - ms.excludeMetrics, _ = compilePatternList(&[]string{"http_request_duration_microseconds_a_b_*"}) + ms.includeMetrics, _ = p.CompilePatternList(&[]string{"http_request_duration_microseconds_a_*"}) + ms.excludeMetrics, _ = p.CompilePatternList(&[]string{"http_request_duration_microseconds_a_b_*"}) metricsToKeep = 0 for _, testFamily := range testFamilies { if !ms.skipFamily(testFamily) { diff --git a/metricbeat/module/prometheus/remote_write/_meta/docs.asciidoc b/metricbeat/module/prometheus/remote_write/_meta/docs.asciidoc index 99f5e120d1a..39522afbd13 100644 --- a/metricbeat/module/prometheus/remote_write/_meta/docs.asciidoc +++ b/metricbeat/module/prometheus/remote_write/_meta/docs.asciidoc @@ -61,3 +61,109 @@ remote_write: # Disable validation of the server certificate. #insecure_skip_verify: true ------------------------------------------------------------------------------ + + +[float] +[role="xpack"] +=== Histograms and types + +beta[] + +[source,yaml] +------------------------------------------------------------------------------------- +metricbeat.modules: +- module: prometheus + metricsets: ["remote_write"] + host: "localhost" + port: "9201" +------------------------------------------------------------------------------------- + +`use_types` parameter (default: false) enables a different layout for metrics storage, leveraging Elasticsearch +types, including https://www.elastic.co/guide/en/elasticsearch/reference/current/histogram.html[histograms]. + +`rate_counters` parameter (default: false) enables calculating a rate out of Prometheus counters. When enabled, Metricbeat stores +the counter increment since the last collection. This metric should make some aggregations easier and with better +performance. This parameter can only be enabled in combination with `use_types`. + +When `use_types` and `rate_counters` are enabled, metrics are stored like this: + +[source,json] +---- +{ + "prometheus": { + "labels": { + "instance": "172.27.0.2:9090", + "job": "prometheus" + }, + "prometheus_target_interval_length_seconds_count": { + "counter": 1, + "rate": 0 + }, + "prometheus_target_interval_length_seconds_sum": { + "counter": 15.000401344, + "rate": 0 + } + "prometheus_tsdb_compaction_chunk_range_seconds_bucket": { + "histogram": { + "values": [50, 300, 1000, 4000, 16000], + "counts": [10, 2, 34, 7] + } + } + }, +} +---- + + +[float] +==== Types' patterns + +Unlike `collector` metricset, `remote_write` receives metrics in raw format from the prometheus server. +In this, the module has to internally use a heuristic in order to identify efficiently the type of each raw metric. +For these purpose some name patterns are used in order to identify the type of each metric. +The default patterns are the following: + +. `_total` suffix: the metric is of Counter type +. `_sum` suffix: the metric is of Counter type +. `_count` suffix: the metric is of Counter type +. `_bucket` suffix and `le` in labels: the metric is of Histogram type + +Everything else is handled as a Gauge. In addition there is no special handling for Summaries so it is expected that +Summary's quantiles are handled as Gauges and Summary's sum and count as Counters. + +Users have the flexibility to add their own patterns using the following configuration: + +[source,yaml] +------------------------------------------------------------------------------------- +metricbeat.modules: +- module: prometheus + metricsets: ["remote_write"] + host: "localhost" + port: "9201" + types_patterns: + counter_patterns: ["_my_counter_suffix"] + histogram_patterns: ["_my_histogram_suffix"] +------------------------------------------------------------------------------------- + +The configuration above will consider metrics with names that match `_my_counter_suffix` as Counters +and those that match `_my_histogram_suffix` (and have `le` in their labels) as Histograms. + + +To match only specific metrics, anchor the start and the end of the regexp of each metric: + +- the caret `^` matches the beginning of a text or line, +- the dollar sign `$` matches the end of a text. + +[source,yaml] +------------------------------------------------------------------------------------- +metricbeat.modules: +- module: prometheus + metricsets: ["remote_write"] + host: "localhost" + port: "9201" + types_patterns: + histogram_patterns: ["^my_histogram_metric$"] +------------------------------------------------------------------------------------- + +Note that when using `types_patterns`, the provided patterns have higher priority than the default patterns. +For instance if `_histogram_total` is a defined histogram pattern, then a metric like `network_bytes_histogram_total` +will be handled as a histogram even of it has the suffix `_total` which is a default pattern for counters. diff --git a/metricbeat/module/prometheus/remote_write/data.go b/metricbeat/module/prometheus/remote_write/data.go index 8dfa6072d6a..2eec6aefaa3 100644 --- a/metricbeat/module/prometheus/remote_write/data.go +++ b/metricbeat/module/prometheus/remote_write/data.go @@ -26,7 +26,17 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" ) -func samplesToEvents(metrics model.Samples) map[string]mb.Event { +// DefaultRemoteWriteEventsGeneratorFactory returns the default prometheus events generator +func DefaultRemoteWriteEventsGeneratorFactory(ms mb.BaseMetricSet) (RemoteWriteEventsGenerator, error) { + return &remoteWriteEventGenerator{}, nil +} + +type remoteWriteEventGenerator struct{} + +func (p *remoteWriteEventGenerator) Start() {} +func (p *remoteWriteEventGenerator) Stop() {} + +func (p *remoteWriteEventGenerator) GenerateEvents(metrics model.Samples) map[string]mb.Event { eventList := map[string]mb.Event{} for _, metric := range metrics { @@ -35,6 +45,11 @@ func samplesToEvents(metrics model.Samples) map[string]mb.Event { if metric == nil { continue } + val := float64(metric.Value) + if math.IsNaN(val) || math.IsInf(val, 0) { + continue + } + name := string(metric.Metric["__name__"]) delete(metric.Metric, "__name__") @@ -42,31 +57,28 @@ func samplesToEvents(metrics model.Samples) map[string]mb.Event { labels[string(k)] = v } - val := float64(metric.Value) - if !math.IsNaN(val) && !math.IsInf(val, 0) { - // join metrics with same labels in a single event - labelsHash := labels.String() - if _, ok := eventList[labelsHash]; !ok { - eventList[labelsHash] = mb.Event{ - ModuleFields: common.MapStr{ - "metrics": common.MapStr{}, - }, - } - - // Add labels - if len(labels) > 0 { - eventList[labelsHash].ModuleFields["labels"] = labels - } + // join metrics with same labels in a single event + labelsHash := labels.String() + if _, ok := eventList[labelsHash]; !ok { + eventList[labelsHash] = mb.Event{ + ModuleFields: common.MapStr{ + "metrics": common.MapStr{}, + }, } - // Not checking anything here because we create these maps some lines before - e := eventList[labelsHash] - e.Timestamp = metric.Timestamp.Time() - data := common.MapStr{ - name: val, + // Add labels + if len(labels) > 0 { + eventList[labelsHash].ModuleFields["labels"] = labels } - e.ModuleFields["metrics"].(common.MapStr).Update(data) } + + // Not checking anything here because we create these maps some lines before + e := eventList[labelsHash] + e.Timestamp = metric.Timestamp.Time() + data := common.MapStr{ + name: val, + } + e.ModuleFields["metrics"].(common.MapStr).Update(data) } return eventList diff --git a/metricbeat/module/prometheus/remote_write/remote_write.go b/metricbeat/module/prometheus/remote_write/remote_write.go index b0f22455eb9..72bd93185f6 100644 --- a/metricbeat/module/prometheus/remote_write/remote_write.go +++ b/metricbeat/module/prometheus/remote_write/remote_write.go @@ -33,15 +33,33 @@ import ( ) func init() { - mb.Registry.MustAddMetricSet("prometheus", "remote_write", New, + mb.Registry.MustAddMetricSet("prometheus", "remote_write", + MetricSetBuilder(DefaultRemoteWriteEventsGeneratorFactory), mb.WithHostParser(parse.EmptyHostParser), ) } +// RemoteWriteEventsGenerator converts Prometheus Samples to a map of mb.Event +type RemoteWriteEventsGenerator interface { + // Start must be called before using the generator + Start() + + // converts Prometheus Samples to a map of mb.Event + GenerateEvents(metrics model.Samples) map[string]mb.Event + + // Stop must be called when the generator won't be used anymore + Stop() +} + +// RemoteWriteEventsGeneratorFactory creates a RemoteWriteEventsGenerator when instanciating a metricset +type RemoteWriteEventsGeneratorFactory func(ms mb.BaseMetricSet) (RemoteWriteEventsGenerator, error) + type MetricSet struct { mb.BaseMetricSet - server serverhelper.Server - events chan mb.Event + server serverhelper.Server + events chan mb.Event + promEventsGen RemoteWriteEventsGenerator + eventGenStarted bool } func New(base mb.BaseMetricSet) (mb.MetricSet, error) { @@ -50,10 +68,19 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + promEventsGen, err := DefaultRemoteWriteEventsGeneratorFactory(base) + if err != nil { + return nil, err + } + m := &MetricSet{ - BaseMetricSet: base, - events: make(chan mb.Event), + BaseMetricSet: base, + events: make(chan mb.Event), + promEventsGen: promEventsGen, + eventGenStarted: false, } + svc, err := httpserver.NewHttpServerWithHandler(base, m.handleFunc) if err != nil { return nil, err @@ -62,6 +89,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return m, nil } +// MetricSetBuilder returns a builder function for a new Prometheus remote_write metricset using +// the given namespace and event generator +func MetricSetBuilder(genFactory RemoteWriteEventsGeneratorFactory) func(base mb.BaseMetricSet) (mb.MetricSet, error) { + return func(base mb.BaseMetricSet) (mb.MetricSet, error) { + config := defaultConfig() + err := base.Module().UnpackConfig(&config) + if err != nil { + return nil, err + } + + promEventsGen, err := genFactory(base) + if err != nil { + return nil, err + } + + m := &MetricSet{ + BaseMetricSet: base, + events: make(chan mb.Event), + promEventsGen: promEventsGen, + eventGenStarted: false, + } + svc, err := httpserver.NewHttpServerWithHandler(base, m.handleFunc) + if err != nil { + return nil, err + } + m.server = svc + + return m, nil + } +} + func (m *MetricSet) Run(reporter mb.PushReporterV2) { // Start event watcher m.server.Start() @@ -77,7 +135,20 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) { } } +// Close stops the metricset +func (m *MetricSet) Close() error { + if m.eventGenStarted { + m.promEventsGen.Stop() + } + return nil +} + func (m *MetricSet) handleFunc(writer http.ResponseWriter, req *http.Request) { + if !m.eventGenStarted { + m.promEventsGen.Start() + m.eventGenStarted = true + } + compressed, err := ioutil.ReadAll(req.Body) if err != nil { m.Logger().Errorf("Read error %v", err) @@ -100,7 +171,7 @@ func (m *MetricSet) handleFunc(writer http.ResponseWriter, req *http.Request) { } samples := protoToSamples(&protoReq) - events := samplesToEvents(samples) + events := m.promEventsGen.GenerateEvents(samples) for _, e := range events { select { diff --git a/x-pack/metricbeat/include/list.go b/x-pack/metricbeat/include/list.go index f99ce41f0f0..108d14802aa 100644 --- a/x-pack/metricbeat/include/list.go +++ b/x-pack/metricbeat/include/list.go @@ -53,6 +53,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/oracle/tablespace" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/prometheus" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/prometheus/collector" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/prometheus/remote_write" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/redisenterprise" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/sql" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/sql/query" diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 45da035284f..141a1cc79b2 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1097,6 +1097,19 @@ metricbeat.modules: # Store counter rates instead of original cumulative counters (experimental, default: false) #rate_counters: true + # Use Elasticsearch histogram type to store histograms (beta, default: false) + # This will change the default layout and put metric type in the field name + #use_types: true + + # Store counter rates instead of original cumulative counters (experimental, default: false) + #rate_counters: true + + # Define patterns for counter and histogram types so as to identify metrics' types according to these patterns + #types_patterns: + # counter_patterns: [] + # histogram_patterns: [] + + # Metrics sent by a Prometheus server using remote_write option #- module: prometheus # metricsets: ["remote_write"] diff --git a/x-pack/metricbeat/module/prometheus/_meta/config.yml b/x-pack/metricbeat/module/prometheus/_meta/config.yml index 6fd4e582c8e..cd54c01383a 100644 --- a/x-pack/metricbeat/module/prometheus/_meta/config.yml +++ b/x-pack/metricbeat/module/prometheus/_meta/config.yml @@ -20,6 +20,19 @@ # Store counter rates instead of original cumulative counters (experimental, default: false) #rate_counters: true + # Use Elasticsearch histogram type to store histograms (beta, default: false) + # This will change the default layout and put metric type in the field name + #use_types: true + + # Store counter rates instead of original cumulative counters (experimental, default: false) + #rate_counters: true + + # Define patterns for counter and histogram types so as to identify metrics' types according to these patterns + #types_patterns: + # counter_patterns: [] + # histogram_patterns: [] + + # Metrics sent by a Prometheus server using remote_write option #- module: prometheus # metricsets: ["remote_write"] diff --git a/x-pack/metricbeat/module/prometheus/collector/data.go b/x-pack/metricbeat/module/prometheus/collector/data.go index 23ef386291b..1dd83a82980 100644 --- a/x-pack/metricbeat/module/prometheus/collector/data.go +++ b/x-pack/metricbeat/module/prometheus/collector/data.go @@ -138,7 +138,7 @@ func (g *typedGenerator) GeneratePromEvents(mf *dto.MetricFamily) []collector.Pr events = append(events, collector.PromEvent{ Data: common.MapStr{ name: common.MapStr{ - "histogram": promHistogramToES(g.counterCache, name, labels, histogram), + "histogram": PromHistogramToES(g.counterCache, name, labels, histogram), }, }, Labels: labels, diff --git a/x-pack/metricbeat/module/prometheus/collector/histogram.go b/x-pack/metricbeat/module/prometheus/collector/histogram.go index 63ed3bf69ce..1d23264a2fb 100644 --- a/x-pack/metricbeat/module/prometheus/collector/histogram.go +++ b/x-pack/metricbeat/module/prometheus/collector/histogram.go @@ -13,7 +13,7 @@ import ( dto "github.com/prometheus/client_model/go" ) -// promHistogramToES takes a Prometheus histogram and converts it to an ES histogram: +// PromHistogramToES takes a Prometheus histogram and converts it to an ES histogram: // // ES histograms look like this: // @@ -27,7 +27,7 @@ import ( // - undoing counters accumulation for each bucket (counts) // // https://www.elastic.co/guide/en/elasticsearch/reference/master/histogram.html -func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histogram *dto.Histogram) common.MapStr { +func PromHistogramToES(cc CounterCache, name string, labels common.MapStr, histogram *dto.Histogram) common.MapStr { var values []float64 var counts []uint64 diff --git a/x-pack/metricbeat/module/prometheus/collector/histogram_test.go b/x-pack/metricbeat/module/prometheus/collector/histogram_test.go index b0906068e76..460d0f3fffe 100644 --- a/x-pack/metricbeat/module/prometheus/collector/histogram_test.go +++ b/x-pack/metricbeat/module/prometheus/collector/histogram_test.go @@ -17,7 +17,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -// TestPromHistogramToES tests that calling promHistogramToES multiple +// TestPromHistogramToES tests that calling PromHistogramToES multiple // times with the same cache produces each time the expected results. func TestPromHistogramToES(t *testing.T) { type sample struct { @@ -398,7 +398,7 @@ func TestPromHistogramToES(t *testing.T) { for i, s := range c.samples { t.Logf("#%d: %+v", i, s.histogram) - result := promHistogramToES(cache, metricName, labels, &s.histogram) + result := PromHistogramToES(cache, metricName, labels, &s.histogram) assert.EqualValues(t, s.expected, result) } }) diff --git a/x-pack/metricbeat/module/prometheus/remote_write/_meta/data.json b/x-pack/metricbeat/module/prometheus/remote_write/_meta/data.json new file mode 100644 index 00000000000..54a1b92e428 --- /dev/null +++ b/x-pack/metricbeat/module/prometheus/remote_write/_meta/data.json @@ -0,0 +1,24 @@ +{ + "@timestamp": "2020-07-17T08:23:53.958Z", + "service": { + "type": "prometheus" + }, + "event": { + "dataset": "prometheus.remote_write", + "module": "prometheus" + }, + "metricset": { + "name": "remote_write" + }, + "prometheus": { + "labels": { + "instance": "nodeexporter:9100", + "job": "nodeexporter", + "device": "eth0" + }, + "node_network_transmit_packets_total": { + "counter": 609, + "rate": 3 + } + } +} diff --git a/x-pack/metricbeat/module/prometheus/remote_write/_meta/docs.asciidoc b/x-pack/metricbeat/module/prometheus/remote_write/_meta/docs.asciidoc new file mode 100644 index 00000000000..e69de29bb2d diff --git a/x-pack/metricbeat/module/prometheus/remote_write/_meta/fields.yml b/x-pack/metricbeat/module/prometheus/remote_write/_meta/fields.yml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/x-pack/metricbeat/module/prometheus/remote_write/config.go b/x-pack/metricbeat/module/prometheus/remote_write/config.go new file mode 100644 index 00000000000..8c5fe12a659 --- /dev/null +++ b/x-pack/metricbeat/module/prometheus/remote_write/config.go @@ -0,0 +1,32 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remote_write + +import "errors" + +type config struct { + UseTypes bool `config:"use_types"` + RateCounters bool `config:"rate_counters"` + TypesPatterns TypesPatterns `config:"types_patterns" yaml:"types_patterns,omitempty"` +} + +type TypesPatterns struct { + CounterPatterns *[]string `config:"counter_patterns" yaml:"include,omitempty"` + HistogramPatterns *[]string `config:"histogram_patterns" yaml:"exclude,omitempty"` +} + +var defaultConfig = config{ + TypesPatterns: TypesPatterns{ + CounterPatterns: nil, + HistogramPatterns: nil}, +} + +func (c *config) Validate() error { + if c.RateCounters && !c.UseTypes { + return errors.New("'rate_counters' can only be enabled when `use_types` is also enabled") + } + + return nil +} diff --git a/x-pack/metricbeat/module/prometheus/remote_write/data.go b/x-pack/metricbeat/module/prometheus/remote_write/data.go new file mode 100644 index 00000000000..5d8a101fbdd --- /dev/null +++ b/x-pack/metricbeat/module/prometheus/remote_write/data.go @@ -0,0 +1,279 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remote_write + +import ( + "math" + "regexp" + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/model" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/libbeat/logp" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/module/prometheus/remote_write" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/prometheus/collector" +) + +const ( + counterType = "counter_type" + histogramType = "histogram_type" + otherType = "other_type" +) + +type histogram struct { + timestamp time.Time + buckets []*dto.Bucket + labels common.MapStr + metricName string +} + +func remoteWriteEventsGeneratorFactory(base mb.BaseMetricSet) (remote_write.RemoteWriteEventsGenerator, error) { + var err error + config := defaultConfig + if err = base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + if config.UseTypes { + // use a counter cache with a timeout of 5x the period, as a safe value + // to make sure that all counters are available between fetches + counters := collector.NewCounterCache(base.Module().Config().Period * 5) + + g := remoteWriteTypedGenerator{ + counterCache: counters, + rateCounters: config.RateCounters, + } + + g.counterPatterns, err = p.CompilePatternList(config.TypesPatterns.CounterPatterns) + if err != nil { + return nil, errors.Wrapf(err, "unable to compile counter patterns") + } + g.histogramPatterns, err = p.CompilePatternList(config.TypesPatterns.HistogramPatterns) + if err != nil { + return nil, errors.Wrapf(err, "unable to compile histogram patterns") + } + + return &g, nil + } + + return remote_write.DefaultRemoteWriteEventsGeneratorFactory(base) +} + +type remoteWriteTypedGenerator struct { + counterCache collector.CounterCache + rateCounters bool + counterPatterns []*regexp.Regexp + histogramPatterns []*regexp.Regexp +} + +func (g *remoteWriteTypedGenerator) Start() { + cfgwarn.Beta("Prometheus 'use_types' setting is beta") + + if g.rateCounters { + cfgwarn.Experimental("Prometheus 'rate_counters' setting is experimental") + } + + g.counterCache.Start() +} + +func (g *remoteWriteTypedGenerator) Stop() { + logp.Debug("prometheus.remote_write.cache", "stopping counterCache") + g.counterCache.Stop() +} + +// GenerateEvents receives a list of Sample and: +// 1. guess the type of the sample metric +// 2. handle it properly using "types" logic +// 3. if metrics of histogram type then it is converted to ES histogram +// 4. metrics with the same set of labels are grouped into same events +func (g remoteWriteTypedGenerator) GenerateEvents(metrics model.Samples) map[string]mb.Event { + var data common.MapStr + histograms := map[string]histogram{} + eventList := map[string]mb.Event{} + + for _, metric := range metrics { + labels := common.MapStr{} + + if metric == nil { + continue + } + val := float64(metric.Value) + if math.IsNaN(val) || math.IsInf(val, 0) { + continue + } + + name := string(metric.Metric["__name__"]) + delete(metric.Metric, "__name__") + + for k, v := range metric.Metric { + labels[string(k)] = v + } + + promType := g.findMetricType(name, labels) + + labelsHash := labels.String() + labelsClone := labels.Clone() + labelsClone.Delete("le") + if promType == histogramType { + labelsHash = labelsClone.String() + } + // join metrics with same labels in a single event + if _, ok := eventList[labelsHash]; !ok { + eventList[labelsHash] = mb.Event{ + ModuleFields: common.MapStr{}, + } + + // Add labels + if len(labels) > 0 { + if promType == histogramType { + eventList[labelsHash].ModuleFields["labels"] = labelsClone + } else { + eventList[labelsHash].ModuleFields["labels"] = labels + } + } + } + + e := eventList[labelsHash] + e.Timestamp = metric.Timestamp.Time() + switch promType { + case counterType: + data = common.MapStr{ + name: g.rateCounterFloat64(name, labels, val), + } + case otherType: + data = common.MapStr{ + name: common.MapStr{ + "value": val, + }, + } + case histogramType: + histKey := name + labelsClone.String() + + le, _ := labels.GetValue("le") + upperBound := string(le.(model.LabelValue)) + + bucket, err := strconv.ParseFloat(upperBound, 64) + if err != nil { + continue + } + v := uint64(val) + b := &dto.Bucket{ + CumulativeCount: &v, + UpperBound: &bucket, + } + hist, ok := histograms[histKey] + if !ok { + hist = histogram{} + } + hist.buckets = append(hist.buckets, b) + hist.timestamp = metric.Timestamp.Time() + hist.labels = labelsClone + hist.metricName = name + histograms[histKey] = hist + continue + } + e.ModuleFields.Update(data) + + } + + // process histograms together + g.processPromHistograms(eventList, histograms) + return eventList +} + +// rateCounterUint64 fills a counter value and optionally adds the rate if rate_counters is enabled +func (g *remoteWriteTypedGenerator) rateCounterUint64(name string, labels common.MapStr, value uint64) common.MapStr { + d := common.MapStr{ + "counter": value, + } + + if g.rateCounters { + d["rate"], _ = g.counterCache.RateUint64(name+labels.String(), value) + } + + return d +} + +// rateCounterFloat64 fills a counter value and optionally adds the rate if rate_counters is enabled +func (g *remoteWriteTypedGenerator) rateCounterFloat64(name string, labels common.MapStr, value float64) common.MapStr { + d := common.MapStr{ + "counter": value, + } + if g.rateCounters { + d["rate"], _ = g.counterCache.RateFloat64(name+labels.String(), value) + } + + return d +} + +// processPromHistograms receives a group of Histograms and converts each one to ES histogram +func (g *remoteWriteTypedGenerator) processPromHistograms(eventList map[string]mb.Event, histograms map[string]histogram) { + for _, histogram := range histograms { + labelsHash := histogram.labels.String() + if _, ok := eventList[labelsHash]; !ok { + eventList[labelsHash] = mb.Event{ + ModuleFields: common.MapStr{}, + } + + // Add labels + if len(histogram.labels) > 0 { + eventList[labelsHash].ModuleFields["labels"] = histogram.labels + } + } + + e := eventList[labelsHash] + e.Timestamp = histogram.timestamp + + hist := dto.Histogram{ + Bucket: histogram.buckets, + } + name := strings.TrimSuffix(histogram.metricName, "_bucket") + data := common.MapStr{ + name: common.MapStr{ + "histogram": collector.PromHistogramToES(g.counterCache, histogram.metricName, histogram.labels, &hist), + }, + } + e.ModuleFields.Update(data) + } +} + +// findMetricType evaluates the type of the metric by check the metricname format in order to handle it properly +func (g *remoteWriteTypedGenerator) findMetricType(metricName string, labels common.MapStr) string { + leLabel := false + if _, ok := labels["le"]; ok { + leLabel = true + } + + // handle user provided patterns + if len(g.counterPatterns) > 0 { + if p.MatchMetricFamily(metricName, g.counterPatterns) { + return counterType + } + } + if len(g.histogramPatterns) > 0 { + if p.MatchMetricFamily(metricName, g.histogramPatterns) && leLabel { + return histogramType + } + } + + // handle defaults + if strings.HasSuffix(metricName, "_total") || strings.HasSuffix(metricName, "_sum") || + strings.HasSuffix(metricName, "_count") { + return counterType + } else if strings.HasSuffix(metricName, "_bucket") && leLabel { + return histogramType + } + + return otherType +} diff --git a/x-pack/metricbeat/module/prometheus/remote_write/remote_write.go b/x-pack/metricbeat/module/prometheus/remote_write/remote_write.go new file mode 100644 index 00000000000..74eadff6d7b --- /dev/null +++ b/x-pack/metricbeat/module/prometheus/remote_write/remote_write.go @@ -0,0 +1,22 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remote_write + +import ( + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" + "github.com/elastic/beats/v7/metricbeat/module/prometheus/remote_write" +) + +func init() { + mb.Registry.MustAddMetricSet("prometheus", "remote_write", + remote_write.MetricSetBuilder(remoteWriteEventsGeneratorFactory), + mb.WithHostParser(parse.EmptyHostParser), + + // must replace ensures that we are replacing the oss implementation with this one + // so we can make use of ES histograms (basic only) when use_types is enabled + mb.MustReplace(), + ) +} diff --git a/x-pack/metricbeat/module/prometheus/remote_write/remote_write_test.go b/x-pack/metricbeat/module/prometheus/remote_write/remote_write_test.go new file mode 100644 index 00000000000..d5c07f0d2a9 --- /dev/null +++ b/x-pack/metricbeat/module/prometheus/remote_write/remote_write_test.go @@ -0,0 +1,1199 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package remote_write + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" + xcollector "github.com/elastic/beats/v7/x-pack/metricbeat/module/prometheus/collector" +) + +// TestGenerateEventsCounter tests counter simple cases +func TestGenerateEventsCounter(t *testing.T) { + + counters := xcollector.NewCounterCache(1 * time.Second) + + g := remoteWriteTypedGenerator{ + counterCache: counters, + rateCounters: true, + } + g.counterCache.Start() + labels := common.MapStr{ + "listener_name": model.LabelValue("http"), + } + + // first fetch + metrics := model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_total", + "listener_name": "http", + }, + Value: model.SampleValue(42), + Timestamp: model.Time(424242), + }, + } + events := g.GenerateEvents(metrics) + + expected := common.MapStr{ + "net_conntrack_listener_conn_closed_total": common.MapStr{ + "counter": float64(42), + "rate": float64(0), + }, + "labels": labels, + } + + assert.Equal(t, len(events), 1) + e := events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + + // repeat in order to test the rate + metrics = model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_total", + "listener_name": "http", + }, + Value: model.SampleValue(45), + Timestamp: model.Time(424242), + }, + } + events = g.GenerateEvents(metrics) + + expected = common.MapStr{ + "net_conntrack_listener_conn_closed_total": common.MapStr{ + "counter": float64(45), + "rate": float64(3), + }, + "labels": labels, + } + + assert.Equal(t, len(events), 1) + e = events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + +} + +// TestGenerateEventsCounterSameLabels tests multiple counters with same labels +func TestGenerateEventsCounterSameLabels(t *testing.T) { + + counters := xcollector.NewCounterCache(1 * time.Second) + + g := remoteWriteTypedGenerator{ + counterCache: counters, + rateCounters: true, + } + g.counterCache.Start() + labels := common.MapStr{ + "listener_name": model.LabelValue("http"), + } + + // first fetch + metrics := model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_total", + "listener_name": "http", + }, + Value: model.SampleValue(42), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + }, + Value: model.SampleValue(43), + Timestamp: model.Time(424242), + }, + } + events := g.GenerateEvents(metrics) + + expected := common.MapStr{ + "net_conntrack_listener_conn_closed_total": common.MapStr{ + "counter": float64(42), + "rate": float64(0), + }, + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(43), + "rate": float64(0), + }, + "labels": labels, + } + + assert.Equal(t, len(events), 1) + e := events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + + // repeat in order to test the rate + metrics = model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_total", + "listener_name": "http", + }, + Value: model.SampleValue(45), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + }, + Value: model.SampleValue(47), + Timestamp: model.Time(424242), + }, + } + events = g.GenerateEvents(metrics) + + expected = common.MapStr{ + "net_conntrack_listener_conn_closed_total": common.MapStr{ + "counter": float64(45), + "rate": float64(3), + }, + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(47), + "rate": float64(4), + }, + "labels": labels, + } + + assert.Equal(t, len(events), 1) + e = events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + +} + +// TestGenerateEventsCounterDifferentLabels tests multiple counters with different labels +func TestGenerateEventsCounterDifferentLabels(t *testing.T) { + + counters := xcollector.NewCounterCache(1 * time.Second) + + g := remoteWriteTypedGenerator{ + counterCache: counters, + rateCounters: true, + } + g.counterCache.Start() + labels := common.MapStr{ + "listener_name": model.LabelValue("http"), + } + labels2 := common.MapStr{ + "listener_name": model.LabelValue("http"), + "device": model.LabelValue("eth0"), + } + + // first fetch + metrics := model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_total", + "listener_name": "http", + }, + Value: model.SampleValue(42), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + }, + Value: model.SampleValue(43), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + "device": "eth0", + }, + Value: model.SampleValue(44), + Timestamp: model.Time(424242), + }, + } + events := g.GenerateEvents(metrics) + + expected1 := common.MapStr{ + "net_conntrack_listener_conn_closed_total": common.MapStr{ + "counter": float64(42), + "rate": float64(0), + }, + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(43), + "rate": float64(0), + }, + "labels": labels, + } + expected2 := common.MapStr{ + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(44), + "rate": float64(0), + }, + "labels": labels2, + } + + assert.Equal(t, len(events), 2) + e := events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected1) + e = events[labels2.String()] + assert.EqualValues(t, e.ModuleFields, expected2) + + // repeat in order to test the rate + metrics = model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_total", + "listener_name": "http", + }, + Value: model.SampleValue(45), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + }, + Value: model.SampleValue(47), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + "device": "eth0", + }, + Value: model.SampleValue(50), + Timestamp: model.Time(424242), + }, + } + events = g.GenerateEvents(metrics) + + expected1 = common.MapStr{ + "net_conntrack_listener_conn_closed_total": common.MapStr{ + "counter": float64(45), + "rate": float64(3), + }, + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(47), + "rate": float64(4), + }, + "labels": labels, + } + expected2 = common.MapStr{ + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(50), + "rate": float64(6), + }, + "labels": labels2, + } + + assert.Equal(t, len(events), 2) + e = events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected1) + e = events[labels2.String()] + assert.EqualValues(t, e.ModuleFields, expected2) + +} + +// TestGenerateEventsGaugeDifferentLabels tests multiple gauges with different labels +func TestGenerateEventsGaugeDifferentLabels(t *testing.T) { + + counters := xcollector.NewCounterCache(1 * time.Second) + + g := remoteWriteTypedGenerator{ + counterCache: counters, + rateCounters: true, + } + g.counterCache.Start() + labels := common.MapStr{ + "listener_name": model.LabelValue("http"), + } + labels2 := common.MapStr{ + "listener_name": model.LabelValue("http"), + "device": model.LabelValue("eth0"), + } + + // first fetch + metrics := model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_total", + "listener_name": "http", + }, + Value: model.SampleValue(42), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + }, + Value: model.SampleValue(43), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + "device": "eth0", + }, + Value: model.SampleValue(44), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_open", + "listener_name": "http", + "device": "eth0", + }, + Value: model.SampleValue(49), + Timestamp: model.Time(424242), + }, + } + events := g.GenerateEvents(metrics) + + expected1 := common.MapStr{ + "net_conntrack_listener_conn_closed_total": common.MapStr{ + "counter": float64(42), + "rate": float64(0), + }, + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(43), + "rate": float64(0), + }, + "labels": labels, + } + expected2 := common.MapStr{ + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(44), + "rate": float64(0), + }, + "net_conntrack_listener_conn_open": common.MapStr{ + "value": float64(49), + }, + "labels": labels2, + } + + assert.Equal(t, len(events), 2) + e := events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected1) + e = events[labels2.String()] + assert.EqualValues(t, e.ModuleFields, expected2) + + // repeat in order to test the rate + metrics = model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_total", + "listener_name": "http", + }, + Value: model.SampleValue(45), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + }, + Value: model.SampleValue(47), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_panic_total", + "listener_name": "http", + "device": "eth0", + }, + Value: model.SampleValue(50), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_open", + "listener_name": "http", + "device": "eth0", + }, + Value: model.SampleValue(59), + Timestamp: model.Time(424242), + }, + } + events = g.GenerateEvents(metrics) + + expected1 = common.MapStr{ + "net_conntrack_listener_conn_closed_total": common.MapStr{ + "counter": float64(45), + "rate": float64(3), + }, + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(47), + "rate": float64(4), + }, + "labels": labels, + } + expected2 = common.MapStr{ + "net_conntrack_listener_conn_panic_total": common.MapStr{ + "counter": float64(50), + "rate": float64(6), + }, + "net_conntrack_listener_conn_open": common.MapStr{ + "value": float64(59), + }, + "labels": labels2, + } + + assert.Equal(t, len(events), 2) + e = events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected1) + e = events[labels2.String()] + assert.EqualValues(t, e.ModuleFields, expected2) + +} + +// TestGenerateEventsQuantilesDifferentLabels tests summaries with different labels +func TestGenerateEventsQuantilesDifferentLabels(t *testing.T) { + + counters := xcollector.NewCounterCache(1 * time.Second) + + g := remoteWriteTypedGenerator{ + counterCache: counters, + rateCounters: true, + } + g.counterCache.Start() + labels := common.MapStr{ + "runtime": model.LabelValue("linux"), + "quantile": model.LabelValue("0.25"), + } + labels2 := common.MapStr{ + "runtime": model.LabelValue("linux"), + "quantile": model.LabelValue("0.50"), + } + labels3 := common.MapStr{ + "runtime": model.LabelValue("linux"), + } + + // first fetch + metrics := model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds", + "runtime": "linux", + "quantile": "0.25", + }, + Value: model.SampleValue(42), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds", + "runtime": "linux", + "quantile": "0.50", + }, + Value: model.SampleValue(43), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds_sum", + "runtime": "linux", + }, + Value: model.SampleValue(44), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds_count", + "runtime": "linux", + }, + Value: model.SampleValue(45), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds_2", + "runtime": "linux", + "quantile": "0.25", + }, + Value: model.SampleValue(46), + Timestamp: model.Time(424242), + }, + } + events := g.GenerateEvents(metrics) + + expected := common.MapStr{ + "go_gc_duration_seconds": common.MapStr{ + "value": float64(42), + }, + "go_gc_duration_seconds_2": common.MapStr{ + "value": float64(46), + }, + "labels": labels, + } + expected2 := common.MapStr{ + "go_gc_duration_seconds": common.MapStr{ + "value": float64(43), + }, + "labels": labels2, + } + expected3 := common.MapStr{ + "go_gc_duration_seconds_count": common.MapStr{ + "counter": float64(45), + "rate": float64(0), + }, + "go_gc_duration_seconds_sum": common.MapStr{ + "counter": float64(44), + "rate": float64(0), + }, + "labels": labels3, + } + + assert.Equal(t, len(events), 3) + e := events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + e = events[labels2.String()] + assert.EqualValues(t, e.ModuleFields, expected2) + e = events[labels3.String()] + assert.EqualValues(t, e.ModuleFields, expected3) + + // repeat in order to test the rate + metrics = model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds", + "runtime": "linux", + "quantile": "0.25", + }, + Value: model.SampleValue(52), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds", + "runtime": "linux", + "quantile": "0.50", + }, + Value: model.SampleValue(53), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds_sum", + "runtime": "linux", + }, + Value: model.SampleValue(54), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds_count", + "runtime": "linux", + }, + Value: model.SampleValue(55), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "go_gc_duration_seconds_2", + "runtime": "linux", + "quantile": "0.25", + }, + Value: model.SampleValue(56), + Timestamp: model.Time(424242), + }, + } + events = g.GenerateEvents(metrics) + + expected = common.MapStr{ + "go_gc_duration_seconds": common.MapStr{ + "value": float64(52), + }, + "go_gc_duration_seconds_2": common.MapStr{ + "value": float64(56), + }, + "labels": labels, + } + expected2 = common.MapStr{ + "go_gc_duration_seconds": common.MapStr{ + "value": float64(53), + }, + "labels": labels2, + } + expected3 = common.MapStr{ + "go_gc_duration_seconds_count": common.MapStr{ + "counter": float64(55), + "rate": float64(10), + }, + "go_gc_duration_seconds_sum": common.MapStr{ + "counter": float64(54), + "rate": float64(10), + }, + "labels": labels3, + } + + assert.Equal(t, len(events), 3) + e = events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + e = events[labels2.String()] + assert.EqualValues(t, e.ModuleFields, expected2) + e = events[labels3.String()] + assert.EqualValues(t, e.ModuleFields, expected3) + +} + +// TestGenerateEventsHistogramsDifferentLabels tests histograms with different labels +func TestGenerateEventsHistogramsDifferentLabels(t *testing.T) { + + counters := xcollector.NewCounterCache(1 * time.Second) + + g := remoteWriteTypedGenerator{ + counterCache: counters, + rateCounters: true, + } + g.counterCache.Start() + labels := common.MapStr{ + "runtime": model.LabelValue("linux"), + } + labels2 := common.MapStr{ + "runtime": model.LabelValue("darwin"), + } + + // first fetch + metrics := model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_bucket", + "runtime": "linux", + "le": "0.25", + }, + Value: model.SampleValue(42), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_bucket", + "runtime": "linux", + "le": "0.50", + }, + Value: model.SampleValue(43), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_bucket", + "runtime": "linux", + "le": "+Inf", + }, + Value: model.SampleValue(44), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_sum", + "runtime": "linux", + }, + Value: model.SampleValue(45), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_count", + "runtime": "linux", + }, + Value: model.SampleValue(46), + Timestamp: model.Time(424242), + }, + // second histogram same label + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "linux", + "le": "0.25", + }, + Value: model.SampleValue(52), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "linux", + "le": "0.50", + }, + Value: model.SampleValue(53), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "linux", + "le": "+Inf", + }, + Value: model.SampleValue(54), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_sum", + "runtime": "linux", + }, + Value: model.SampleValue(55), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_count", + "runtime": "linux", + }, + Value: model.SampleValue(56), + Timestamp: model.Time(424242), + }, + // third histogram different label + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "darwin", + "le": "0.25", + }, + Value: model.SampleValue(62), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "darwin", + "le": "0.50", + }, + Value: model.SampleValue(63), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "darwin", + "le": "+Inf", + }, + Value: model.SampleValue(64), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_sum", + "runtime": "darwin", + }, + Value: model.SampleValue(65), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_count", + "runtime": "darwin", + }, + Value: model.SampleValue(66), + Timestamp: model.Time(424242), + }, + } + events := g.GenerateEvents(metrics) + + expected := common.MapStr{ + "http_request_duration_seconds": common.MapStr{ + "histogram": common.MapStr{ + "values": []float64{float64(0.125), float64(0.375), float64(0.75)}, + "counts": []uint64{uint64(0), uint64(0), uint64(0)}, + }, + }, + "http_request_duration_seconds_sum": common.MapStr{ + "counter": float64(45), + "rate": float64(0), + }, + "http_request_duration_seconds_count": common.MapStr{ + "counter": float64(46), + "rate": float64(0), + }, + "http_request_bytes": common.MapStr{ + "histogram": common.MapStr{ + "values": []float64{float64(0.125), float64(0.375), float64(0.75)}, + "counts": []uint64{uint64(0), uint64(0), uint64(0)}, + }, + }, + "http_request_bytes_sum": common.MapStr{ + "counter": float64(55), + "rate": float64(0), + }, + "http_request_bytes_count": common.MapStr{ + "counter": float64(56), + "rate": float64(0), + }, + "labels": labels, + } + expected2 := common.MapStr{ + "http_request_bytes": common.MapStr{ + "histogram": common.MapStr{ + "values": []float64{float64(0.125), float64(0.375), float64(0.75)}, + "counts": []uint64{uint64(0), uint64(0), uint64(0)}, + }, + }, + "http_request_bytes_sum": common.MapStr{ + "counter": float64(65), + "rate": float64(0), + }, + "http_request_bytes_count": common.MapStr{ + "counter": float64(66), + "rate": float64(0), + }, + "labels": labels2, + } + + assert.Equal(t, 2, len(events)) + e := events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + e = events[labels2.String()] + assert.EqualValues(t, e.ModuleFields, expected2) + + // repeat in order to test the rate + metrics = model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_bucket", + "runtime": "linux", + "le": "0.25", + }, + Value: model.SampleValue(142), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_bucket", + "runtime": "linux", + "le": "0.50", + }, + Value: model.SampleValue(143), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_bucket", + "runtime": "linux", + "le": "+Inf", + }, + Value: model.SampleValue(144), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_sum", + "runtime": "linux", + }, + Value: model.SampleValue(145), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_duration_seconds_count", + "runtime": "linux", + }, + Value: model.SampleValue(146), + Timestamp: model.Time(424242), + }, + // second histogram same label + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "linux", + "le": "0.25", + }, + Value: model.SampleValue(252), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "linux", + "le": "0.50", + }, + Value: model.SampleValue(253), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "linux", + "le": "+Inf", + }, + Value: model.SampleValue(254), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_sum", + "runtime": "linux", + }, + Value: model.SampleValue(255), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_count", + "runtime": "linux", + }, + Value: model.SampleValue(256), + Timestamp: model.Time(424242), + }, + // third histogram different label + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "darwin", + "le": "0.25", + }, + Value: model.SampleValue(362), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "darwin", + "le": "0.50", + }, + Value: model.SampleValue(363), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_bucket", + "runtime": "darwin", + "le": "+Inf", + }, + Value: model.SampleValue(364), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_sum", + "runtime": "darwin", + }, + Value: model.SampleValue(365), + Timestamp: model.Time(424242), + }, + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "http_request_bytes_count", + "runtime": "darwin", + }, + Value: model.SampleValue(366), + Timestamp: model.Time(424242), + }, + } + events = g.GenerateEvents(metrics) + + expected = common.MapStr{ + "http_request_duration_seconds": common.MapStr{ + "histogram": common.MapStr{ + "values": []float64{float64(0.125), float64(0.375), float64(0.75)}, + "counts": []uint64{uint64(100), uint64(0), uint64(0)}, + }, + }, + "http_request_duration_seconds_sum": common.MapStr{ + "counter": float64(145), + "rate": float64(100), + }, + "http_request_duration_seconds_count": common.MapStr{ + "counter": float64(146), + "rate": float64(100), + }, + "http_request_bytes": common.MapStr{ + "histogram": common.MapStr{ + "values": []float64{float64(0.125), float64(0.375), float64(0.75)}, + "counts": []uint64{uint64(200), uint64(0), uint64(0)}, + }, + }, + "http_request_bytes_sum": common.MapStr{ + "counter": float64(255), + "rate": float64(200), + }, + "http_request_bytes_count": common.MapStr{ + "counter": float64(256), + "rate": float64(200), + }, + "labels": labels, + } + expected2 = common.MapStr{ + "http_request_bytes": common.MapStr{ + "histogram": common.MapStr{ + "values": []float64{float64(0.125), float64(0.375), float64(0.75)}, + "counts": []uint64{uint64(300), uint64(0), uint64(0)}, + }, + }, + "http_request_bytes_sum": common.MapStr{ + "counter": float64(365), + "rate": float64(300), + }, + "http_request_bytes_count": common.MapStr{ + "counter": float64(366), + "rate": float64(300), + }, + "labels": labels2, + } + + assert.Equal(t, 2, len(events)) + e = events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + e = events[labels2.String()] + assert.EqualValues(t, e.ModuleFields, expected2) +} + +// TestGenerateEventsCounterWithDefinedPattern tests counter with defined pattern +func TestGenerateEventsCounterWithDefinedPattern(t *testing.T) { + + counters := xcollector.NewCounterCache(1 * time.Second) + + counterPatterns, err := p.CompilePatternList(&[]string{"_mycounter"}) + if err != nil { + panic(err) + } + g := remoteWriteTypedGenerator{ + counterCache: counters, + rateCounters: true, + counterPatterns: counterPatterns, + } + + g.counterCache.Start() + labels := common.MapStr{ + "listener_name": model.LabelValue("http"), + } + + // first fetch + metrics := model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_mycounter", + "listener_name": "http", + }, + Value: model.SampleValue(42), + Timestamp: model.Time(424242), + }, + } + events := g.GenerateEvents(metrics) + + expected := common.MapStr{ + "net_conntrack_listener_conn_closed_mycounter": common.MapStr{ + "counter": float64(42), + "rate": float64(0), + }, + "labels": labels, + } + + assert.Equal(t, len(events), 1) + e := events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + + // repeat in order to test the rate + metrics = model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_mycounter", + "listener_name": "http", + }, + Value: model.SampleValue(45), + Timestamp: model.Time(424242), + }, + } + events = g.GenerateEvents(metrics) + + expected = common.MapStr{ + "net_conntrack_listener_conn_closed_mycounter": common.MapStr{ + "counter": float64(45), + "rate": float64(3), + }, + "labels": labels, + } + + assert.Equal(t, len(events), 1) + e = events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + +} + +// TestGenerateEventsHistogramWithDefinedPattern tests histogram with defined pattern +func TestGenerateEventsHistogramWithDefinedPattern(t *testing.T) { + + counters := xcollector.NewCounterCache(1 * time.Second) + + histogramPatterns, err := p.CompilePatternList(&[]string{"_myhistogram"}) + if err != nil { + panic(err) + } + g := remoteWriteTypedGenerator{ + counterCache: counters, + rateCounters: true, + histogramPatterns: histogramPatterns, + } + + g.counterCache.Start() + labels := common.MapStr{ + "listener_name": model.LabelValue("http"), + } + + // first fetch + metrics := model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_myhistogram", + "listener_name": "http", + "le": "20", + }, + Value: model.SampleValue(42), + Timestamp: model.Time(424242), + }, + } + events := g.GenerateEvents(metrics) + + expected := common.MapStr{ + "net_conntrack_listener_conn_closed_myhistogram": common.MapStr{ + "histogram": common.MapStr{ + "values": []float64{float64(10)}, + "counts": []uint64{uint64(0)}, + }, + }, + "labels": labels, + } + + assert.Equal(t, len(events), 1) + e := events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + + // repeat in order to test the rate + metrics = model.Samples{ + &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "net_conntrack_listener_conn_closed_myhistogram", + "listener_name": "http", + "le": "20", + }, + Value: model.SampleValue(45), + Timestamp: model.Time(424242), + }, + } + events = g.GenerateEvents(metrics) + + expected = common.MapStr{ + "net_conntrack_listener_conn_closed_myhistogram": common.MapStr{ + "histogram": common.MapStr{ + "values": []float64{float64(10)}, + "counts": []uint64{uint64(3)}, + }, + }, + "labels": labels, + } + + assert.Equal(t, len(events), 1) + e = events[labels.String()] + assert.EqualValues(t, e.ModuleFields, expected) + +} diff --git a/x-pack/metricbeat/modules.d/prometheus.yml.disabled b/x-pack/metricbeat/modules.d/prometheus.yml.disabled index f5882aff4fc..5dbe163c62a 100644 --- a/x-pack/metricbeat/modules.d/prometheus.yml.disabled +++ b/x-pack/metricbeat/modules.d/prometheus.yml.disabled @@ -23,6 +23,19 @@ # Store counter rates instead of original cumulative counters (experimental, default: false) #rate_counters: true + # Use Elasticsearch histogram type to store histograms (beta, default: false) + # This will change the default layout and put metric type in the field name + #use_types: true + + # Store counter rates instead of original cumulative counters (experimental, default: false) + #rate_counters: true + + # Define patterns for counter and histogram types so as to identify metrics' types according to these patterns + #types_patterns: + # counter_patterns: [] + # histogram_patterns: [] + + # Metrics sent by a Prometheus server using remote_write option #- module: prometheus # metricsets: ["remote_write"]