diff --git a/metricbeat/module/ceph/cluster_health/cluster_health.go b/metricbeat/module/ceph/cluster_health/cluster_health.go index 3242aab4ac8..e1579b21c50 100644 --- a/metricbeat/module/ceph/cluster_health/cluster_health.go +++ b/metricbeat/module/ceph/cluster_health/cluster_health.go @@ -18,7 +18,6 @@ package cluster_health import ( - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -61,11 +60,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { }, nil } -func (m *MetricSet) Fetch() (common.MapStr, error) { +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(reporter mb.ReporterV2) { content, err := m.HTTP.FetchContent() if err != nil { - return nil, err + m.Logger().Error(err) + reporter.Error(err) + return } - return eventMapping(content), nil + events, err := eventMapping(content) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + + reporter.Event(mb.Event{MetricSetFields: events}) + + return } diff --git a/metricbeat/module/ceph/cluster_health/cluster_health_integration_test.go b/metricbeat/module/ceph/cluster_health/cluster_health_integration_test.go index 799ec05af8b..fe876b21219 100644 --- a/metricbeat/module/ceph/cluster_health/cluster_health_integration_test.go +++ b/metricbeat/module/ceph/cluster_health/cluster_health_integration_test.go @@ -26,9 +26,9 @@ import ( ) func TestData(t *testing.T) { - f := mbtest.NewEventFetcher(t, getConfig()) - err := mbtest.WriteEvent(f, t) - if err != nil { + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + + if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { t.Fatal("write", err) } } diff --git a/metricbeat/module/ceph/cluster_health/cluster_health_test.go b/metricbeat/module/ceph/cluster_health/cluster_health_test.go index 465c7d4d2ef..5cc1cdb6366 100644 --- a/metricbeat/module/ceph/cluster_health/cluster_health_test.go +++ b/metricbeat/module/ceph/cluster_health/cluster_health_test.go @@ -50,8 +50,13 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewEventFetcher(t, config) - event, err := f.Fetch() + f := mbtest.NewReportingMetricSetV2(t, config) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + event := events[0].MetricSetFields t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint()) diff --git a/metricbeat/module/ceph/cluster_health/data.go b/metricbeat/module/ceph/cluster_health/data.go index 10ecfbcd389..f624d5bdacb 100644 --- a/metricbeat/module/ceph/cluster_health/data.go +++ b/metricbeat/module/ceph/cluster_health/data.go @@ -20,8 +20,9 @@ package cluster_health import ( "encoding/json" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" ) type Timecheck struct { @@ -40,11 +41,11 @@ type HealthRequest struct { Output Output `json:"output"` } -func eventMapping(content []byte) common.MapStr { +func eventMapping(content []byte) (common.MapStr, error) { var d HealthRequest err := json.Unmarshal(content, &d) if err != nil { - logp.Err("Error: %+v", err) + return nil, errors.Wrap(err, "error getting HealthRequest data") } return common.MapStr{ @@ -56,5 +57,5 @@ func eventMapping(content []byte) common.MapStr { "status": d.Output.Timechecks.RoundStatus, }, }, - } + }, nil }