Skip to content

Commit

Permalink
[Metricbeat] Migrate Ceph cluster_health to use ReporterV2 interface (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sayden authored Mar 7, 2019
1 parent af4c0f9 commit 7ed8570
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 13 deletions.
21 changes: 17 additions & 4 deletions metricbeat/module/ceph/cluster_health/cluster_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package cluster_health

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand Down Expand Up @@ -61,11 +60,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}, nil
}

func (m *MetricSet) Fetch() (common.MapStr, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
content, err := m.HTTP.FetchContent()
if err != nil {
return nil, err
m.Logger().Error(err)
reporter.Error(err)
return
}

return eventMapping(content), nil
events, err := eventMapping(content)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
}

reporter.Event(mb.Event{MetricSetFields: events})

return
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
)

func TestData(t *testing.T) {
f := mbtest.NewEventFetcher(t, getConfig())
err := mbtest.WriteEvent(f, t)
if err != nil {
f := mbtest.NewReportingMetricSetV2(t, getConfig())

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
9 changes: 7 additions & 2 deletions metricbeat/module/ceph/cluster_health/cluster_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ func TestFetchEventContents(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewEventFetcher(t, config)
event, err := f.Fetch()
f := mbtest.NewReportingMetricSetV2(t, config)
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)
event := events[0].MetricSetFields

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint())

Expand Down
9 changes: 5 additions & 4 deletions metricbeat/module/ceph/cluster_health/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package cluster_health
import (
"encoding/json"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type Timecheck struct {
Expand All @@ -40,11 +41,11 @@ type HealthRequest struct {
Output Output `json:"output"`
}

func eventMapping(content []byte) common.MapStr {
func eventMapping(content []byte) (common.MapStr, error) {
var d HealthRequest
err := json.Unmarshal(content, &d)
if err != nil {
logp.Err("Error: %+v", err)
return nil, errors.Wrap(err, "error getting HealthRequest data")
}

return common.MapStr{
Expand All @@ -56,5 +57,5 @@ func eventMapping(content []byte) common.MapStr {
"status": d.Output.Timechecks.RoundStatus,
},
},
}
}, nil
}

0 comments on commit 7ed8570

Please sign in to comment.