Skip to content

Commit

Permalink
[metricbeat] Migrate k8s modules to reporterv2 with error (#13559)
Browse files Browse the repository at this point in the history
* migrate k8s modules, prom helper and ptest framework to reporterv2 with error
  • Loading branch information
fearful-symmetry authored Sep 18, 2019
1 parent b876c18 commit 8ce4b7c
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 79 deletions.
4 changes: 2 additions & 2 deletions metricbeat/helper/prometheus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ type prometheusMetricSet struct {
mapping *MetricsMapping
}

func (m *prometheusMetricSet) Fetch(r mb.ReporterV2) {
m.prometheus.ReportProcessedMetrics(m.mapping, r)
func (m *prometheusMetricSet) Fetch(r mb.ReporterV2) error {
return m.prometheus.ReportProcessedMetrics(m.mapping, r)
}
9 changes: 5 additions & 4 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Prometheus interface {

GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error)

ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2)
ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error
}

type prometheus struct {
Expand Down Expand Up @@ -214,18 +214,19 @@ type infoMetricData struct {
Meta common.MapStr
}

func (p *prometheus) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) {
func (p *prometheus) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error {
events, err := p.GetProcessedMetrics(mapping)
if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error getting processed metrics")
}
for _, event := range events {
r.Event(mb.Event{
MetricSetFields: event,
Namespace: mapping.Namespace,
})
}

return nil
}

func getEvent(m map[string]common.MapStr, labels common.MapStr) common.MapStr {
Expand Down
10 changes: 4 additions & 6 deletions metricbeat/helper/prometheus/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,11 @@ func TestMetricSet(t *testing.T, module, metricset string, cases TestCases) {
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2(t, config)
reporter := &mbtest.CapturingReporterV2{}
f.Fetch(reporter)
assert.Nil(t, reporter.GetErrors(), "Errors while fetching metrics")
f := mbtest.NewFetcher(t, config)
events, errs := f.FetchEvents()
assert.Nil(t, errs, "Errors while fetching metrics")

if *expectedFlag {
events := reporter.GetEvents()
sort.SliceStable(events, func(i, j int) bool {
h1, _ := hashstructure.Hash(events[i], nil)
h2, _ := hashstructure.Hash(events[j], nil)
Expand All @@ -185,7 +183,7 @@ func TestMetricSet(t *testing.T, module, metricset string, cases TestCases) {
t.Fatal(err)
}

for _, event := range reporter.GetEvents() {
for _, event := range events {
// ensure the event is in expected list
found := -1
for i, expectedEvent := range expectedEvents {
Expand Down
11 changes: 7 additions & 4 deletions metricbeat/module/kubernetes/apiserver/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package apiserver

import (
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/helper/prometheus"
"github.com/elastic/beats/metricbeat/mb"
)
Expand All @@ -29,7 +31,7 @@ type metricset struct {
prometheusMappings *prometheus.MetricsMapping
}

var _ mb.ReportingMetricSetV2 = (*metricset)(nil)
var _ mb.ReportingMetricSetV2Error = (*metricset)(nil)

// getMetricsetFactory as required by` mb.Registry.MustAddMetricSet`
func getMetricsetFactory(prometheusMappings *prometheus.MetricsMapping) mb.MetricSetFactory {
Expand All @@ -47,11 +49,10 @@ func getMetricsetFactory(prometheusMappings *prometheus.MetricsMapping) mb.Metri
}

// Fetch as expected by `mb.EventFetcher`
func (m *metricset) Fetch(reporter mb.ReporterV2) {
func (m *metricset) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
if err != nil {
reporter.Error(err)
return
return errors.Wrap(err, "error getting metrics")
}

rcPost14 := false
Expand Down Expand Up @@ -88,4 +89,6 @@ func (m *metricset) Fetch(reporter mb.ReporterV2) {
Namespace: m.prometheusMappings.Namespace,
})
}

return nil
}
19 changes: 9 additions & 10 deletions metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,30 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
if err != nil {
err = errors.Wrap(err, "error doing HTTP request to fetch 'container' Metricset data")
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error doing HTTP request to fetch 'container' Metricset data")

}

events, err := eventMapping(body, util.PerfMetrics)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

m.enricher.Enrich(events)

for _, e := range events {
reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil))
isOpen := reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil))
if !isOpen {
return nil
}
}

return
return nil
}

// Close stops this metricset
Expand Down
15 changes: 7 additions & 8 deletions metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package node

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -79,28 +81,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error doing HTTP request to fetch 'node' Metricset data")

}

event, err := eventMapping(body)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

m.enricher.Enrich([]common.MapStr{event})

reporter.Event(mb.TransformMapStrToEvent("kubernetes", event, nil))

return
return nil
}

// Close stops this metricset
Expand Down
19 changes: 10 additions & 9 deletions metricbeat/module/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package pod

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper"
Expand Down Expand Up @@ -78,29 +80,28 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error doing HTTP request to fetch 'pod' Metricset data")
}

events, err := eventMapping(body, util.PerfMetrics)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

m.enricher.Enrich(events)

for _, e := range events {
reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil))
isOpen := reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil))
if !isOpen {
return nil
}
}
return
return nil
}

// Close stops this metricset
Expand Down
13 changes: 7 additions & 6 deletions metricbeat/module/kubernetes/state_container/state_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package state_container

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
p "github.com/elastic/beats/metricbeat/helper/prometheus"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -105,14 +107,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error getting event")
}

m.enricher.Enrich(events)
Expand Down Expand Up @@ -146,10 +146,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
ModuleFields: moduleFieldsMapStr,
Namespace: "kubernetes.container",
}); !reported {
m.Logger().Debug("error trying to emit event")
return
return nil
}
}

return nil
}

// Close stops this metricset
Expand Down
13 changes: 6 additions & 7 deletions metricbeat/module/kubernetes/state_cronjob/state_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package state_cronjob

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
p "github.com/elastic/beats/metricbeat/helper/prometheus"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -74,12 +76,10 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
// module rooted fields at the event that gets reported
//
// Copied from other kube state metrics.
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) {
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheus.GetProcessedMetrics(m.mapping)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error getting metrics")
}

for _, event := range events {
Expand All @@ -98,10 +98,9 @@ func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) {
ModuleFields: moduleFieldsMapStr,
Namespace: "kubernetes.cronjob",
}); !reported {
m.Logger().Debug("error trying to emit event")
return
return nil
}
}

return
return nil
}
13 changes: 7 additions & 6 deletions metricbeat/module/kubernetes/state_node/state_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package state_node

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/kubernetes"
p "github.com/elastic/beats/metricbeat/helper/prometheus"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -96,25 +98,24 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data")
}

m.enricher.Enrich(events)
for _, event := range events {
event[mb.NamespaceKey] = "node"
reported := reporter.Event(mb.TransformMapStrToEvent("kubernetes", event, nil))
if !reported {
m.Logger().Debug("error trying to emit event")
return
return nil
}
}

return nil
}

// Close stops this metricset
Expand Down
19 changes: 10 additions & 9 deletions metricbeat/module/kubernetes/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package system

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -73,23 +75,22 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
body, err := m.http.FetchContent()
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error doing HTTP request to fetch 'system' Metricset data")
}

events, err := eventMapping(body)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

for _, e := range events {
reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil))
isOpen := reporter.Event(mb.TransformMapStrToEvent("kubernetes", e, nil))
if !isOpen {
return nil
}
}
return
return nil
}
Loading

0 comments on commit 8ce4b7c

Please sign in to comment.