From b8d10dca8bfe49144fb13b1d71077012464c6e02 Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Thu, 30 May 2019 08:26:56 -0500 Subject: [PATCH] [metricbeat] migrate rabbitmq to reporterv2 with error return (#12228) * migrate rabbitmq to reporterv2 with error return --- .../module/rabbitmq/connection/connection.go | 9 +++-- .../rabbitmq/connection/connection_test.go | 2 +- metricbeat/module/rabbitmq/connection/data.go | 39 ++++++++++--------- metricbeat/module/rabbitmq/exchange/data.go | 33 +++++++--------- .../module/rabbitmq/exchange/exchange.go | 9 +++-- .../module/rabbitmq/exchange/exchange_test.go | 6 +-- metricbeat/module/rabbitmq/node/data.go | 28 ++++++++----- metricbeat/module/rabbitmq/node/node.go | 25 ++++++------ .../rabbitmq/node/node_integration_test.go | 6 +-- metricbeat/module/rabbitmq/node/node_test.go | 24 ++++++------ metricbeat/module/rabbitmq/queue/data.go | 33 +++++++--------- metricbeat/module/rabbitmq/queue/queue.go | 9 +++-- .../module/rabbitmq/queue/queue_test.go | 6 +-- 13 files changed, 119 insertions(+), 110 deletions(-) diff --git a/metricbeat/module/rabbitmq/connection/connection.go b/metricbeat/module/rabbitmq/connection/connection.go index 548ebaf6af3..76773460a7f 100644 --- a/metricbeat/module/rabbitmq/connection/connection.go +++ b/metricbeat/module/rabbitmq/connection/connection.go @@ -18,6 +18,8 @@ package connection import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/rabbitmq" ) @@ -44,13 +46,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch makes an HTTP request to fetch connections metrics from the connections endpoint. -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in fetch") } - eventsMapping(content, r) + return eventsMapping(content, r, m) } diff --git a/metricbeat/module/rabbitmq/connection/connection_test.go b/metricbeat/module/rabbitmq/connection/connection_test.go index 37942e237ef..198b6a577cc 100644 --- a/metricbeat/module/rabbitmq/connection/connection_test.go +++ b/metricbeat/module/rabbitmq/connection/connection_test.go @@ -33,7 +33,7 @@ func TestFetchEventContents(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} - metricSet := mbtest.NewReportingMetricSetV2(t, getConfig(server.URL)) + metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL)) metricSet.Fetch(reporter) e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) diff --git a/metricbeat/module/rabbitmq/connection/data.go b/metricbeat/module/rabbitmq/connection/data.go index be1e56f4486..f7183e07c46 100644 --- a/metricbeat/module/rabbitmq/connection/data.go +++ b/metricbeat/module/rabbitmq/connection/data.go @@ -20,21 +20,20 @@ package connection import ( "encoding/json" - "github.com/joeshaw/multierror" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" ) var ( schema = s.Schema{ "name": c.Str("name"), - "vhost": c.Str("vhost"), - "user": c.Str("user"), - "node": c.Str("node"), + "vhost": c.Str("vhost", s.Required), + "user": c.Str("user", s.Required), + "node": c.Str("node", s.Required), "channels": c.Int("channels"), "channel_max": c.Int("channel_max"), "frame_max": c.Int("frame_max"), @@ -57,30 +56,33 @@ var ( } ) -func eventsMapping(content []byte, r mb.ReporterV2) { +func eventsMapping(content []byte, r mb.ReporterV2, m *MetricSet) error { var connections []map[string]interface{} err := json.Unmarshal(content, &connections) if err != nil { - logp.Err("Error: %+v", err) - r.Error(err) - return + return errors.Wrap(err, "error in unmarshal") } - var errors multierror.Errors for _, node := range connections { - err := eventMapping(node, r) + evt, err := eventMapping(node) if err != nil { - errors = append(errors, err) + m.Logger().Errorf("error in mapping: %s", err) + r.Error(err) + continue } - } - if len(errors) > 0 { - r.Error(errors.Err()) + if !r.Event(evt) { + return nil + } } + return nil } -func eventMapping(connection map[string]interface{}, r mb.ReporterV2) error { - fields, err := schema.Apply(connection) +func eventMapping(connection map[string]interface{}) (mb.Event, error) { + fields, err := schema.Apply(connection, s.FailOnRequired) + if err != nil { + return mb.Event{}, errors.Wrap(err, "error applying schema") + } rootFields := common.MapStr{} if v, err := fields.GetValue("user"); err == nil { @@ -104,6 +106,5 @@ func eventMapping(connection map[string]interface{}, r mb.ReporterV2) error { RootFields: rootFields, ModuleFields: moduleFields, } - r.Event(event) - return err + return event, nil } diff --git a/metricbeat/module/rabbitmq/exchange/data.go b/metricbeat/module/rabbitmq/exchange/data.go index 7afa0c6e7c3..73c398ee2fe 100644 --- a/metricbeat/module/rabbitmq/exchange/data.go +++ b/metricbeat/module/rabbitmq/exchange/data.go @@ -20,12 +20,11 @@ package exchange import ( "encoding/json" - "github.com/joeshaw/multierror" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" ) @@ -56,33 +55,31 @@ var ( } ) -func eventsMapping(content []byte, r mb.ReporterV2) { +func eventsMapping(content []byte, r mb.ReporterV2, m *MetricSet) error { var exchanges []map[string]interface{} err := json.Unmarshal(content, &exchanges) if err != nil { - logp.Err("Error: %+v", err) - r.Error(err) - return + return errors.Wrap(err, "error in unmarshal") } - var errors multierror.Errors - for _, exchange := range exchanges { - err := eventMapping(exchange, r) + evt, err := eventMapping(exchange) if err != nil { - errors = append(errors, err) + m.Logger().Errorf("error in mapping: %s", err) + r.Error(err) + continue + } + if !r.Event(evt) { + return nil } } - - if len(errors) > 0 { - r.Error(errors.Err()) - } + return nil } -func eventMapping(exchange map[string]interface{}, r mb.ReporterV2) error { +func eventMapping(exchange map[string]interface{}) (mb.Event, error) { fields, err := schema.Apply(exchange) if err != nil { - return err + return mb.Event{}, err } rootFields := common.MapStr{} @@ -102,6 +99,6 @@ func eventMapping(exchange map[string]interface{}, r mb.ReporterV2) error { RootFields: rootFields, ModuleFields: moduleFields, } - r.Event(event) - return nil + return event, nil + } diff --git a/metricbeat/module/rabbitmq/exchange/exchange.go b/metricbeat/module/rabbitmq/exchange/exchange.go index 6260432d67b..38353a89d82 100644 --- a/metricbeat/module/rabbitmq/exchange/exchange.go +++ b/metricbeat/module/rabbitmq/exchange/exchange.go @@ -18,6 +18,8 @@ package exchange import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/rabbitmq" ) @@ -46,13 +48,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in fetch") } - eventsMapping(content, r) + return eventsMapping(content, r, m) } diff --git a/metricbeat/module/rabbitmq/exchange/exchange_test.go b/metricbeat/module/rabbitmq/exchange/exchange_test.go index 5789f1701be..b051b12e79c 100644 --- a/metricbeat/module/rabbitmq/exchange/exchange_test.go +++ b/metricbeat/module/rabbitmq/exchange/exchange_test.go @@ -33,7 +33,7 @@ func TestFetchEventContents(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} - metricSet := mbtest.NewReportingMetricSetV2(t, getConfig(server.URL)) + metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL)) metricSet.Fetch(reporter) e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) @@ -64,8 +64,8 @@ func TestData(t *testing.T) { server := mtest.Server(t, mtest.DefaultServerConfig) defer server.Close() - ms := mbtest.NewReportingMetricSetV2(t, getConfig(server.URL)) - err := mbtest.WriteEventsReporterV2Cond(ms, t, "", func(e common.MapStr) bool { + ms := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL)) + err := mbtest.WriteEventsReporterV2ErrorCond(ms, t, "", func(e common.MapStr) bool { hasIn, _ := e.HasKey("rabbitmq.exchange.messages.publish_in") hasOut, _ := e.HasKey("rabbitmq.exchange.messages.publish_out") return hasIn && hasOut diff --git a/metricbeat/module/rabbitmq/node/data.go b/metricbeat/module/rabbitmq/node/data.go index 4318887d70f..4c5dc41d870 100644 --- a/metricbeat/module/rabbitmq/node/data.go +++ b/metricbeat/module/rabbitmq/node/data.go @@ -20,6 +20,8 @@ package node import ( "encoding/json" + "github.com/pkg/errors" + s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" "github.com/elastic/beats/metricbeat/mb" @@ -145,26 +147,34 @@ var ( } ) -func eventsMapping(r mb.ReporterV2, content []byte) { +func eventsMapping(r mb.ReporterV2, content []byte, m *ClusterMetricSet) error { var nodes []map[string]interface{} err := json.Unmarshal(content, &nodes) if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in Unmarshal") } for _, node := range nodes { - eventMapping(r, node) + evt, err := eventMapping(node) + if err != nil { + m.Logger().Errorf("error in mapping: %s", err) + r.Error(err) + continue + } + if !r.Event(evt) { + return nil + } } + return nil } -func eventMapping(r mb.ReporterV2, node map[string]interface{}) { +func eventMapping(node map[string]interface{}) (mb.Event, error) { event, err := schema.Apply(node) if err != nil { - r.Error(err) - return + return mb.Event{}, errors.Wrap(err, "error applying schema") } - r.Event(mb.Event{ + return mb.Event{ MetricSetFields: event, - }) + }, nil + } diff --git a/metricbeat/module/rabbitmq/node/node.go b/metricbeat/module/rabbitmq/node/node.go index 6c90a986b5e..ac93f4d080b 100644 --- a/metricbeat/module/rabbitmq/node/node.go +++ b/metricbeat/module/rabbitmq/node/node.go @@ -89,35 +89,36 @@ func (m *MetricSet) fetchOverview() (*apiOverview, error) { } // Fetch metrics from rabbitmq node -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { o, err := m.fetchOverview() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in fetch") } node, err := rabbitmq.NewMetricSet(m.BaseMetricSet, rabbitmq.NodesPath+"/"+o.Node) if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error creating new metricset") } content, err := node.HTTP.FetchJSON() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in fetch") } - eventMapping(r, content) + evt, err := eventMapping(content) + if err != nil { + return errors.Wrap(err, "error in mapping") + } + r.Event(evt) + return nil } // Fetch metrics from all rabbitmq nodes in the cluster -func (m *ClusterMetricSet) Fetch(r mb.ReporterV2) { +func (m *ClusterMetricSet) Fetch(r mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in fetch") } - eventsMapping(r, content) + return eventsMapping(r, content, m) } diff --git a/metricbeat/module/rabbitmq/node/node_integration_test.go b/metricbeat/module/rabbitmq/node/node_integration_test.go index f1855ba8fce..420a1e25e47 100644 --- a/metricbeat/module/rabbitmq/node/node_integration_test.go +++ b/metricbeat/module/rabbitmq/node/node_integration_test.go @@ -28,8 +28,8 @@ import ( ) func TestData(t *testing.T) { - ms := mbtest.NewReportingMetricSetV2(t, getConfig()) - err := mbtest.WriteEventsReporterV2(ms, t, "") + ms := mbtest.NewReportingMetricSetV2Error(t, getConfig()) + err := mbtest.WriteEventsReporterV2Error(ms, t, "") if err != nil { t.Fatal("write", err) } @@ -41,7 +41,7 @@ func TestFetch(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} - metricSet := mbtest.NewReportingMetricSetV2(t, getConfig()) + metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig()) metricSet.Fetch(reporter) e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) diff --git a/metricbeat/module/rabbitmq/node/node_test.go b/metricbeat/module/rabbitmq/node/node_test.go index b385dd8409f..e8786aaa678 100644 --- a/metricbeat/module/rabbitmq/node/node_test.go +++ b/metricbeat/module/rabbitmq/node/node_test.go @@ -46,8 +46,8 @@ func testFetch(t *testing.T, collect string) { "node.collect": collect, } - ms := mbtest.NewReportingMetricSetV2(t, config) - events, errors := mbtest.ReportingFetchV2(ms) + ms := mbtest.NewReportingMetricSetV2Error(t, config) + events, errors := mbtest.ReportingFetchV2Error(ms) if !assert.True(t, len(errors) == 0, "There shouldn't be errors") { t.Log(errors) } @@ -76,11 +76,11 @@ func testFetch(t *testing.T, collect string) { assert.EqualValues(t, 27352751800, reclaimed["bytes"]) io := event["io"].(common.MapStr) - file_handle := io["file_handle"].(common.MapStr) - open_attempt := file_handle["open_attempt"].(common.MapStr) - avg := open_attempt["avg"].(common.MapStr) + fileHandle := io["file_handle"].(common.MapStr) + openAttempt := fileHandle["open_attempt"].(common.MapStr) + avg := openAttempt["avg"].(common.MapStr) assert.EqualValues(t, 0, avg["ms"]) - assert.EqualValues(t, 597670, open_attempt["count"]) + assert.EqualValues(t, 597670, openAttempt["count"]) read := io["read"].(common.MapStr) avg = read["avg"].(common.MapStr) @@ -122,10 +122,10 @@ func testFetch(t *testing.T, collect string) { assert.EqualValues(t, 92, tx["count"]) msg := event["msg"].(common.MapStr) - store_read := msg["store_read"].(common.MapStr) - assert.EqualValues(t, 0, store_read["count"]) - store_write := msg["store_write"].(common.MapStr) - assert.EqualValues(t, 0, store_write["count"]) + storeRead := msg["store_read"].(common.MapStr) + assert.EqualValues(t, 0, storeRead["count"]) + storeWrite := msg["store_write"].(common.MapStr) + assert.EqualValues(t, 0, storeWrite["count"]) assert.EqualValues(t, "rabbit@e2b1ae6390fd", event["name"]) @@ -137,8 +137,8 @@ func testFetch(t *testing.T, collect string) { queue := event["queue"].(common.MapStr) index := queue["index"].(common.MapStr) - journal_write := index["journal_write"].(common.MapStr) - assert.EqualValues(t, 448230, journal_write["count"]) + journalWrite := index["journal_write"].(common.MapStr) + assert.EqualValues(t, 448230, journalWrite["count"]) read = index["read"].(common.MapStr) assert.EqualValues(t, 0, read["count"]) write = index["write"].(common.MapStr) diff --git a/metricbeat/module/rabbitmq/queue/data.go b/metricbeat/module/rabbitmq/queue/data.go index 2ebae73a138..2399f48e013 100644 --- a/metricbeat/module/rabbitmq/queue/data.go +++ b/metricbeat/module/rabbitmq/queue/data.go @@ -20,14 +20,13 @@ package queue import ( "encoding/json" - "github.com/elastic/beats/metricbeat/mb" + "github.com/pkg/errors" - "github.com/joeshaw/multierror" + "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" - "github.com/elastic/beats/libbeat/logp" ) var ( @@ -85,32 +84,32 @@ var ( } ) -func eventsMapping(content []byte, r mb.ReporterV2) { +func eventsMapping(content []byte, r mb.ReporterV2, m *MetricSet) error { var queues []map[string]interface{} err := json.Unmarshal(content, &queues) if err != nil { - logp.Err("Error: %+v", err) - r.Error(err) - return + return errors.Wrap(err, "error in mapping") } - var errors multierror.Errors for _, queue := range queues { - err := eventMapping(queue, r) + evt, err := eventMapping(queue) if err != nil { - errors = append(errors, err) + m.Logger().Errorf("error in mapping: %s", err) + r.Error(err) + continue + } + if !r.Event(evt) { + return nil } } - if len(errors) > 0 { - r.Error(errors.Err()) - } + return nil } -func eventMapping(queue map[string]interface{}, r mb.ReporterV2) error { +func eventMapping(queue map[string]interface{}) (mb.Event, error) { fields, err := schema.Apply(queue) if err != nil { - return err + return mb.Event{}, errors.Wrap(err, "error applying schema") } moduleFields := common.MapStr{} @@ -128,7 +127,5 @@ func eventMapping(queue map[string]interface{}, r mb.ReporterV2) error { MetricSetFields: fields, ModuleFields: moduleFields, } - - r.Event(event) - return nil + return event, nil } diff --git a/metricbeat/module/rabbitmq/queue/queue.go b/metricbeat/module/rabbitmq/queue/queue.go index 995eb58fc18..36346ceab1e 100644 --- a/metricbeat/module/rabbitmq/queue/queue.go +++ b/metricbeat/module/rabbitmq/queue/queue.go @@ -18,6 +18,8 @@ package queue import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/rabbitmq" ) @@ -44,13 +46,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch fetches queue data -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in fetch") } - eventsMapping(content, r) + return eventsMapping(content, r, m) } diff --git a/metricbeat/module/rabbitmq/queue/queue_test.go b/metricbeat/module/rabbitmq/queue/queue_test.go index 2f3320abbf1..1153a916aa8 100644 --- a/metricbeat/module/rabbitmq/queue/queue_test.go +++ b/metricbeat/module/rabbitmq/queue/queue_test.go @@ -33,7 +33,7 @@ func TestFetchEventContents(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} - metricSet := mbtest.NewReportingMetricSetV2(t, getConfig(server.URL)) + metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL)) metricSet.Fetch(reporter) e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) @@ -89,8 +89,8 @@ func TestData(t *testing.T) { server := mtest.Server(t, mtest.DefaultServerConfig) defer server.Close() - ms := mbtest.NewReportingMetricSetV2(t, getConfig(server.URL)) - err := mbtest.WriteEventsReporterV2Cond(ms, t, "", func(e common.MapStr) bool { + ms := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL)) + err := mbtest.WriteEventsReporterV2ErrorCond(ms, t, "", func(e common.MapStr) bool { hasTotal, _ := e.HasKey("rabbitmq.queue.messages.total") return hasTotal })