Skip to content

Commit

Permalink
Refactoring: use new Fetch interface that automatically reports and l…
Browse files Browse the repository at this point in the history
…ogs errors (elastic#11767)
  • Loading branch information
ycombinator authored Apr 12, 2019
1 parent 462f798 commit dbf39c1
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 55 deletions.
8 changes: 4 additions & 4 deletions metricbeat/module/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func TestFetch(t *testing.T) {
compose.EnsureUp(t, "logstash")

for _, metricSet := range metricSets {
f := mbtest.NewReportingMetricSetV2(t, logstash.GetConfig(metricSet))
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, logstash.GetConfig(metricSet))
events, errs := mbtest.ReportingFetchV2Error(f)

assert.Empty(t, errs)
if !assert.NotEmpty(t, events) {
Expand All @@ -58,8 +58,8 @@ func TestData(t *testing.T) {

for _, metricSet := range metricSets {
config := logstash.GetConfig(metricSet)
f := mbtest.NewReportingMetricSetV2(t, config)
err := mbtest.WriteEventsReporterV2(f, t, metricSet)
f := mbtest.NewReportingMetricSetV2Error(t, config)
err := mbtest.WriteEventsReporterV2Error(f, t, metricSet)
if err != nil {
t.Fatal("write", err)
}
Expand Down
24 changes: 6 additions & 18 deletions metricbeat/module/logstash/node/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,54 +50,42 @@ func eventMapping(r mb.ReporterV2, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
event.Error = errors.Wrap(err, "failure parsing Logstash Node API response")
r.Event(event)
return event.Error
return errors.Wrap(err, "failure parsing Logstash Node API response")
}

fields, err := schema.Apply(data)
if err != nil {
event.Error = errors.Wrap(err, "failure applying node schema")
r.Event(event)
return event.Error
return errors.Wrap(err, "failure applying node schema")
}

// Set service ID
serviceID, err := fields.GetValue("id")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("id", elastic.Logstash)
r.Event(event)
return event.Error
return elastic.MakeErrorForMissingField("id", elastic.Logstash)
}
event.RootFields.Put("service.id", serviceID)
fields.Delete("id")

// Set service hostname
host, err := fields.GetValue("host")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("host", elastic.Logstash)
r.Event(event)
return event.Error
return elastic.MakeErrorForMissingField("host", elastic.Logstash)
}
event.RootFields.Put("service.hostname", host)
fields.Delete("host")

// Set service version
version, err := fields.GetValue("version")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("version", elastic.Logstash)
r.Event(event)
return event.Error
return elastic.MakeErrorForMissingField("version", elastic.Logstash)
}
event.RootFields.Put("service.version", version)
fields.Delete("version")

// Set PID
pid, err := fields.GetValue("jvm.pid")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("jvm.pid", elastic.Logstash)
r.Event(event)
return event.Error
return elastic.MakeErrorForMissingField("jvm.pid", elastic.Logstash)
}
event.RootFields.Put("process.pid", pid)
fields.Delete("jvm.pid")
Expand Down
12 changes: 3 additions & 9 deletions metricbeat/module/logstash/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package node

import (
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/logstash"
Expand Down Expand Up @@ -69,16 +68,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
content, err := m.http.FetchContent()
if err != nil {
elastic.ReportAndLogError(err, r, m.Logger())
return
return err
}

err = eventMapping(r, content)
if err != nil {
m.Logger().Error(err)
return
}
return eventMapping(r, content)
}
20 changes: 5 additions & 15 deletions metricbeat/module/logstash/node_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,44 +51,34 @@ func eventMapping(r mb.ReporterV2, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
event.Error = errors.Wrap(err, "failure parsing Logstash Node Stats API response")
r.Event(event)
return event.Error
return errors.Wrap(err, "failure parsing Logstash Node Stats API response")
}

fields, err := schema.Apply(data)
if err != nil {
event.Error = errors.Wrap(err, "failure applying node stats schema")
r.Event(event)
return event.Error
return errors.Wrap(err, "failure applying node stats schema")
}

// Set service ID
serviceID, err := fields.GetValue("id")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("id", elastic.Logstash)
r.Event(event)
return event.Error
return elastic.MakeErrorForMissingField("id", elastic.Logstash)
}
event.RootFields.Put("service.id", serviceID)
fields.Delete("id")

// Set service hostname
host, err := fields.GetValue("host")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("host", elastic.Logstash)
r.Event(event)
return event.Error
return elastic.MakeErrorForMissingField("host", elastic.Logstash)
}
event.RootFields.Put("service.hostname", host)
fields.Delete("host")

// Set service version
version, err := fields.GetValue("version")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("version", elastic.Logstash)
r.Event(event)
return event.Error
return elastic.MakeErrorForMissingField("version", elastic.Logstash)
}
event.RootFields.Put("service.version", version)
fields.Delete("version")
Expand Down
12 changes: 3 additions & 9 deletions metricbeat/module/logstash/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package node_stats

import (
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/helper/elastic"

"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand Down Expand Up @@ -70,16 +69,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
content, err := m.http.FetchContent()
if err != nil {
elastic.ReportAndLogError(err, r, m.Logger())
return
return err
}

err = eventMapping(r, content)
if err != nil {
m.Logger().Error(err)
return
}
return eventMapping(r, content)
}

0 comments on commit dbf39c1

Please sign in to comment.