From 8ce4b7c269cb688770637d74388a80cbc452d415 Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Wed, 18 Sep 2019 14:14:31 -0700 Subject: [PATCH] [metricbeat] Migrate k8s modules to reporterv2 with error (#13559) * migrate k8s modules, prom helper and ptest framework to reporterv2 with error --- metricbeat/helper/prometheus/module.go | 4 ++-- metricbeat/helper/prometheus/prometheus.go | 9 +++++---- metricbeat/helper/prometheus/ptest/ptest.go | 10 ++++------ .../module/kubernetes/apiserver/metricset.go | 11 +++++++---- .../module/kubernetes/container/container.go | 19 +++++++++---------- metricbeat/module/kubernetes/node/node.go | 15 +++++++-------- metricbeat/module/kubernetes/pod/pod.go | 19 ++++++++++--------- .../state_container/state_container.go | 13 +++++++------ .../kubernetes/state_cronjob/state_cronjob.go | 13 ++++++------- .../kubernetes/state_node/state_node.go | 13 +++++++------ metricbeat/module/kubernetes/system/system.go | 19 ++++++++++--------- metricbeat/module/kubernetes/volume/volume.go | 15 +++++++++------ .../coredns/stats/stats_integration_test.go | 4 ++-- 13 files changed, 85 insertions(+), 79 deletions(-) diff --git a/metricbeat/helper/prometheus/module.go b/metricbeat/helper/prometheus/module.go index 9ac6f48bcad..da1ea895f25 100644 --- a/metricbeat/helper/prometheus/module.go +++ b/metricbeat/helper/prometheus/module.go @@ -56,6 +56,6 @@ type prometheusMetricSet struct { mapping *MetricsMapping } -func (m *prometheusMetricSet) Fetch(r mb.ReporterV2) { - m.prometheus.ReportProcessedMetrics(m.mapping, r) +func (m *prometheusMetricSet) Fetch(r mb.ReporterV2) error { + return m.prometheus.ReportProcessedMetrics(m.mapping, r) } diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index d7e3676d5e5..1eb5392b805 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -38,7 +38,7 @@ type Prometheus interface { GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) - ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) + ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error } type prometheus struct { @@ -214,11 +214,10 @@ type infoMetricData struct { Meta common.MapStr } -func (p *prometheus) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) { +func (p *prometheus) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error { events, err := p.GetProcessedMetrics(mapping) if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error getting processed metrics") } for _, event := range events { r.Event(mb.Event{ @@ -226,6 +225,8 @@ func (p *prometheus) ReportProcessedMetrics(mapping *MetricsMapping, r mb.Report Namespace: mapping.Namespace, }) } + + return nil } func getEvent(m map[string]common.MapStr, labels common.MapStr) common.MapStr { diff --git a/metricbeat/helper/prometheus/ptest/ptest.go b/metricbeat/helper/prometheus/ptest/ptest.go index c91a2690eff..4bcadbc5f67 100644 --- a/metricbeat/helper/prometheus/ptest/ptest.go +++ b/metricbeat/helper/prometheus/ptest/ptest.go @@ -156,13 +156,11 @@ func TestMetricSet(t *testing.T, module, metricset string, cases TestCases) { "hosts": []string{server.URL}, } - f := mbtest.NewReportingMetricSetV2(t, config) - reporter := &mbtest.CapturingReporterV2{} - f.Fetch(reporter) - assert.Nil(t, reporter.GetErrors(), "Errors while fetching metrics") + f := mbtest.NewFetcher(t, config) + events, errs := f.FetchEvents() + assert.Nil(t, errs, "Errors while fetching metrics") if *expectedFlag { - events := reporter.GetEvents() sort.SliceStable(events, func(i, j int) bool { h1, _ := hashstructure.Hash(events[i], nil) h2, _ := hashstructure.Hash(events[j], nil) @@ -185,7 +183,7 @@ func TestMetricSet(t *testing.T, module, metricset string, cases TestCases) { t.Fatal(err) } - for _, event := range reporter.GetEvents() { + for _, event := range events { // ensure the event is in expected list found := -1 for i, expectedEvent := range expectedEvents { diff --git a/metricbeat/module/kubernetes/apiserver/metricset.go b/metricbeat/module/kubernetes/apiserver/metricset.go index ca55ab9369a..81a4c5e43c0 100644 --- a/metricbeat/module/kubernetes/apiserver/metricset.go +++ b/metricbeat/module/kubernetes/apiserver/metricset.go @@ -18,6 +18,8 @@ package apiserver import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" ) @@ -29,7 +31,7 @@ type metricset struct { prometheusMappings *prometheus.MetricsMapping } -var _ mb.ReportingMetricSetV2 = (*metricset)(nil) +var _ mb.ReportingMetricSetV2Error = (*metricset)(nil) // getMetricsetFactory as required by` mb.Registry.MustAddMetricSet` func getMetricsetFactory(prometheusMappings *prometheus.MetricsMapping) mb.MetricSetFactory { @@ -47,11 +49,10 @@ func getMetricsetFactory(prometheusMappings *prometheus.MetricsMapping) mb.Metri } // Fetch as expected by `mb.EventFetcher` -func (m *metricset) Fetch(reporter mb.ReporterV2) { +func (m *metricset) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) if err != nil { - reporter.Error(err) - return + return errors.Wrap(err, "error getting metrics") } rcPost14 := false @@ -88,4 +89,6 @@ func (m *metricset) Fetch(reporter mb.ReporterV2) { Namespace: m.prometheusMappings.Namespace, }) } + + return nil } diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index ef9bb35d267..acbf437163d 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -78,31 +78,30 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() body, err := m.http.FetchContent() if err != nil { - err = errors.Wrap(err, "error doing HTTP request to fetch 'container' Metricset data") - logger.Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error doing HTTP request to fetch 'container' Metricset data") + } events, err := eventMapping(body, util.PerfMetrics) if err != nil { - logger.Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in mapping") } m.enricher.Enrich(events) for _, e := range events { - reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil)) + isOpen := reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil)) + if !isOpen { + return nil + } } - return + return nil } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index cc545356dbd..804600ecb69 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -18,6 +18,8 @@ package node import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/kubernetes" "github.com/elastic/beats/libbeat/logp" @@ -79,28 +81,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() body, err := m.http.FetchContent() if err != nil { - logger.Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error doing HTTP request to fetch 'node' Metricset data") + } event, err := eventMapping(body) if err != nil { - logger.Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in mapping") } m.enricher.Enrich([]common.MapStr{event}) reporter.Event(mb.TransformMapStrToEvent("kubernetes", event, nil)) - return + return nil } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index b65a6a20b6d..8762f6ad1dd 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -18,6 +18,8 @@ package pod import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common/kubernetes" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper" @@ -78,29 +80,28 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() body, err := m.http.FetchContent() if err != nil { - logger.Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error doing HTTP request to fetch 'pod' Metricset data") } events, err := eventMapping(body, util.PerfMetrics) if err != nil { - logger.Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in mapping") } m.enricher.Enrich(events) for _, e := range events { - reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil)) + isOpen := reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil)) + if !isOpen { + return nil + } } - return + return nil } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index 258c9bc887e..101fbdcfbc9 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -18,6 +18,8 @@ package state_container import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" @@ -105,14 +107,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() events, err := m.prometheus.GetProcessedMetrics(mapping) if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error getting event") } m.enricher.Enrich(events) @@ -146,10 +146,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { ModuleFields: moduleFieldsMapStr, Namespace: "kubernetes.container", }); !reported { - m.Logger().Debug("error trying to emit event") - return + return nil } } + + return nil } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go index 39f3d4ed9d6..58d61d5cccc 100644 --- a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go +++ b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go @@ -18,6 +18,8 @@ package state_cronjob import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" @@ -74,12 +76,10 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // module rooted fields at the event that gets reported // // Copied from other kube state metrics. -func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) { +func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheus.GetProcessedMetrics(m.mapping) if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error getting metrics") } for _, event := range events { @@ -98,10 +98,9 @@ func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) { ModuleFields: moduleFieldsMapStr, Namespace: "kubernetes.cronjob", }); !reported { - m.Logger().Debug("error trying to emit event") - return + return nil } } - return + return nil } diff --git a/metricbeat/module/kubernetes/state_node/state_node.go b/metricbeat/module/kubernetes/state_node/state_node.go index 8185ef1b365..8835f3f09cd 100644 --- a/metricbeat/module/kubernetes/state_node/state_node.go +++ b/metricbeat/module/kubernetes/state_node/state_node.go @@ -18,6 +18,8 @@ package state_node import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common/kubernetes" p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" @@ -96,14 +98,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() events, err := m.prometheus.GetProcessedMetrics(mapping) if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data") } m.enricher.Enrich(events) @@ -111,10 +111,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { event[mb.NamespaceKey] = "node" reported := reporter.Event(mb.TransformMapStrToEvent("kubernetes", event, nil)) if !reported { - m.Logger().Debug("error trying to emit event") - return + return nil } } + + return nil } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/system/system.go b/metricbeat/module/kubernetes/system/system.go index 28d17a2bb3a..fd9495b392e 100644 --- a/metricbeat/module/kubernetes/system/system.go +++ b/metricbeat/module/kubernetes/system/system.go @@ -18,6 +18,8 @@ package system import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" @@ -73,23 +75,22 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { body, err := m.http.FetchContent() if err != nil { - logger.Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error doing HTTP request to fetch 'system' Metricset data") } events, err := eventMapping(body) if err != nil { - logger.Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in mapping") } for _, e := range events { - reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil)) + isOpen := reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil)) + if !isOpen { + return nil + } } - return + return nil } diff --git a/metricbeat/module/kubernetes/volume/volume.go b/metricbeat/module/kubernetes/volume/volume.go index bc9ebb59140..b4a9589ca9a 100644 --- a/metricbeat/module/kubernetes/volume/volume.go +++ b/metricbeat/module/kubernetes/volume/volume.go @@ -18,6 +18,8 @@ package volume import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" @@ -73,18 +75,19 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { body, err := m.http.FetchContent() if err != nil { - logger.Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error doing HTTP request to fetch 'volume' Metricset data") } events, err := eventMapping(body) for _, e := range events { - reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil)) + isOpen := reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil)) + if !isOpen { + return nil + } } - return + return nil } diff --git a/x-pack/metricbeat/module/coredns/stats/stats_integration_test.go b/x-pack/metricbeat/module/coredns/stats/stats_integration_test.go index 74e2e74d9f7..3e113ff3528 100644 --- a/x-pack/metricbeat/module/coredns/stats/stats_integration_test.go +++ b/x-pack/metricbeat/module/coredns/stats/stats_integration_test.go @@ -18,8 +18,8 @@ import ( func TestFetch(t *testing.T) { service := compose.EnsureUp(t, "coredns") - f := mbtest.NewReportingMetricSetV2(t, getConfig(service.Host())) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewFetcher(t, getConfig(service.Host())) + events, errs := f.FetchEvents() if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) }