diff --git a/metricbeat/module/logstash/logstash_integration_test.go b/metricbeat/module/logstash/logstash_integration_test.go index 6987b44535c..9f3ac7ab123 100644 --- a/metricbeat/module/logstash/logstash_integration_test.go +++ b/metricbeat/module/logstash/logstash_integration_test.go @@ -40,8 +40,8 @@ func TestFetch(t *testing.T) { compose.EnsureUp(t, "logstash") for _, metricSet := range metricSets { - f := mbtest.NewReportingMetricSetV2(t, logstash.GetConfig(metricSet)) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, logstash.GetConfig(metricSet)) + events, errs := mbtest.ReportingFetchV2Error(f) assert.Empty(t, errs) if !assert.NotEmpty(t, events) { @@ -58,8 +58,8 @@ func TestData(t *testing.T) { for _, metricSet := range metricSets { config := logstash.GetConfig(metricSet) - f := mbtest.NewReportingMetricSetV2(t, config) - err := mbtest.WriteEventsReporterV2(f, t, metricSet) + f := mbtest.NewReportingMetricSetV2Error(t, config) + err := mbtest.WriteEventsReporterV2Error(f, t, metricSet) if err != nil { t.Fatal("write", err) } diff --git a/metricbeat/module/logstash/node/data.go b/metricbeat/module/logstash/node/data.go index 25cac855012..525ea1b014e 100644 --- a/metricbeat/module/logstash/node/data.go +++ b/metricbeat/module/logstash/node/data.go @@ -50,24 +50,18 @@ func eventMapping(r mb.ReporterV2, content []byte) error { var data map[string]interface{} err := json.Unmarshal(content, &data) if err != nil { - event.Error = errors.Wrap(err, "failure parsing Logstash Node API response") - r.Event(event) - return event.Error + return errors.Wrap(err, "failure parsing Logstash Node API response") } fields, err := schema.Apply(data) if err != nil { - event.Error = errors.Wrap(err, "failure applying node schema") - r.Event(event) - return event.Error + return errors.Wrap(err, "failure applying node schema") } // Set service ID serviceID, err := fields.GetValue("id") if err != nil { - event.Error = elastic.MakeErrorForMissingField("id", elastic.Logstash) - r.Event(event) - return event.Error + return elastic.MakeErrorForMissingField("id", elastic.Logstash) } event.RootFields.Put("service.id", serviceID) fields.Delete("id") @@ -75,9 +69,7 @@ func eventMapping(r mb.ReporterV2, content []byte) error { // Set service hostname host, err := fields.GetValue("host") if err != nil { - event.Error = elastic.MakeErrorForMissingField("host", elastic.Logstash) - r.Event(event) - return event.Error + return elastic.MakeErrorForMissingField("host", elastic.Logstash) } event.RootFields.Put("service.hostname", host) fields.Delete("host") @@ -85,9 +77,7 @@ func eventMapping(r mb.ReporterV2, content []byte) error { // Set service version version, err := fields.GetValue("version") if err != nil { - event.Error = elastic.MakeErrorForMissingField("version", elastic.Logstash) - r.Event(event) - return event.Error + return elastic.MakeErrorForMissingField("version", elastic.Logstash) } event.RootFields.Put("service.version", version) fields.Delete("version") @@ -95,9 +85,7 @@ func eventMapping(r mb.ReporterV2, content []byte) error { // Set PID pid, err := fields.GetValue("jvm.pid") if err != nil { - event.Error = elastic.MakeErrorForMissingField("jvm.pid", elastic.Logstash) - r.Event(event) - return event.Error + return elastic.MakeErrorForMissingField("jvm.pid", elastic.Logstash) } event.RootFields.Put("process.pid", pid) fields.Delete("jvm.pid") diff --git a/metricbeat/module/logstash/node/node.go b/metricbeat/module/logstash/node/node.go index db0cef279d5..1e8048a0f41 100644 --- a/metricbeat/module/logstash/node/node.go +++ b/metricbeat/module/logstash/node/node.go @@ -19,7 +19,6 @@ package node import ( "github.com/elastic/beats/metricbeat/helper" - "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/logstash" @@ -69,16 +68,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right format // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { content, err := m.http.FetchContent() if err != nil { - elastic.ReportAndLogError(err, r, m.Logger()) - return + return err } - err = eventMapping(r, content) - if err != nil { - m.Logger().Error(err) - return - } + return eventMapping(r, content) } diff --git a/metricbeat/module/logstash/node_stats/data.go b/metricbeat/module/logstash/node_stats/data.go index c8896df8482..052a5b4ad31 100644 --- a/metricbeat/module/logstash/node_stats/data.go +++ b/metricbeat/module/logstash/node_stats/data.go @@ -51,24 +51,18 @@ func eventMapping(r mb.ReporterV2, content []byte) error { var data map[string]interface{} err := json.Unmarshal(content, &data) if err != nil { - event.Error = errors.Wrap(err, "failure parsing Logstash Node Stats API response") - r.Event(event) - return event.Error + return errors.Wrap(err, "failure parsing Logstash Node Stats API response") } fields, err := schema.Apply(data) if err != nil { - event.Error = errors.Wrap(err, "failure applying node stats schema") - r.Event(event) - return event.Error + return errors.Wrap(err, "failure applying node stats schema") } // Set service ID serviceID, err := fields.GetValue("id") if err != nil { - event.Error = elastic.MakeErrorForMissingField("id", elastic.Logstash) - r.Event(event) - return event.Error + return elastic.MakeErrorForMissingField("id", elastic.Logstash) } event.RootFields.Put("service.id", serviceID) fields.Delete("id") @@ -76,9 +70,7 @@ func eventMapping(r mb.ReporterV2, content []byte) error { // Set service hostname host, err := fields.GetValue("host") if err != nil { - event.Error = elastic.MakeErrorForMissingField("host", elastic.Logstash) - r.Event(event) - return event.Error + return elastic.MakeErrorForMissingField("host", elastic.Logstash) } event.RootFields.Put("service.hostname", host) fields.Delete("host") @@ -86,9 +78,7 @@ func eventMapping(r mb.ReporterV2, content []byte) error { // Set service version version, err := fields.GetValue("version") if err != nil { - event.Error = elastic.MakeErrorForMissingField("version", elastic.Logstash) - r.Event(event) - return event.Error + return elastic.MakeErrorForMissingField("version", elastic.Logstash) } event.RootFields.Put("service.version", version) fields.Delete("version") diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index 373ca22add2..5ac114f9a9e 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -19,7 +19,6 @@ package node_stats import ( "github.com/elastic/beats/metricbeat/helper" - "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -70,16 +69,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right format // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { content, err := m.http.FetchContent() if err != nil { - elastic.ReportAndLogError(err, r, m.Logger()) - return + return err } - err = eventMapping(r, content) - if err != nil { - m.Logger().Error(err) - return - } + return eventMapping(r, content) }