Skip to content

Commit

Permalink
[metricbeat] migrate rabbitmq to reporterv2 with error return (#12228)
Browse files Browse the repository at this point in the history
* migrate rabbitmq to reporterv2 with error return
  • Loading branch information
fearful-symmetry authored May 30, 2019
1 parent 9ebe5c6 commit b8d10dc
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 110 deletions.
9 changes: 5 additions & 4 deletions metricbeat/module/rabbitmq/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package connection

import (
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/rabbitmq"
)
Expand All @@ -44,13 +46,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch makes an HTTP request to fetch connections metrics from the connections endpoint.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()

if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

eventsMapping(content, r)
return eventsMapping(content, r, m)
}
2 changes: 1 addition & 1 deletion metricbeat/module/rabbitmq/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestFetchEventContents(t *testing.T) {

reporter := &mbtest.CapturingReporterV2{}

metricSet := mbtest.NewReportingMetricSetV2(t, getConfig(server.URL))
metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL))
metricSet.Fetch(reporter)

e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0])
Expand Down
39 changes: 20 additions & 19 deletions metricbeat/module/rabbitmq/connection/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ package connection
import (
"encoding/json"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
)

var (
schema = s.Schema{
"name": c.Str("name"),
"vhost": c.Str("vhost"),
"user": c.Str("user"),
"node": c.Str("node"),
"vhost": c.Str("vhost", s.Required),
"user": c.Str("user", s.Required),
"node": c.Str("node", s.Required),
"channels": c.Int("channels"),
"channel_max": c.Int("channel_max"),
"frame_max": c.Int("frame_max"),
Expand All @@ -57,30 +56,33 @@ var (
}
)

func eventsMapping(content []byte, r mb.ReporterV2) {
func eventsMapping(content []byte, r mb.ReporterV2, m *MetricSet) error {
var connections []map[string]interface{}
err := json.Unmarshal(content, &connections)
if err != nil {
logp.Err("Error: %+v", err)
r.Error(err)
return
return errors.Wrap(err, "error in unmarshal")
}

var errors multierror.Errors
for _, node := range connections {
err := eventMapping(node, r)
evt, err := eventMapping(node)
if err != nil {
errors = append(errors, err)
m.Logger().Errorf("error in mapping: %s", err)
r.Error(err)
continue
}
}

if len(errors) > 0 {
r.Error(errors.Err())
if !r.Event(evt) {
return nil
}
}
return nil
}

func eventMapping(connection map[string]interface{}, r mb.ReporterV2) error {
fields, err := schema.Apply(connection)
func eventMapping(connection map[string]interface{}) (mb.Event, error) {
fields, err := schema.Apply(connection, s.FailOnRequired)
if err != nil {
return mb.Event{}, errors.Wrap(err, "error applying schema")
}

rootFields := common.MapStr{}
if v, err := fields.GetValue("user"); err == nil {
Expand All @@ -104,6 +106,5 @@ func eventMapping(connection map[string]interface{}, r mb.ReporterV2) error {
RootFields: rootFields,
ModuleFields: moduleFields,
}
r.Event(event)
return err
return event, nil
}
33 changes: 15 additions & 18 deletions metricbeat/module/rabbitmq/exchange/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ package exchange
import (
"encoding/json"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
)

Expand Down Expand Up @@ -56,33 +55,31 @@ var (
}
)

func eventsMapping(content []byte, r mb.ReporterV2) {
func eventsMapping(content []byte, r mb.ReporterV2, m *MetricSet) error {
var exchanges []map[string]interface{}
err := json.Unmarshal(content, &exchanges)
if err != nil {
logp.Err("Error: %+v", err)
r.Error(err)
return
return errors.Wrap(err, "error in unmarshal")
}

var errors multierror.Errors

for _, exchange := range exchanges {
err := eventMapping(exchange, r)
evt, err := eventMapping(exchange)
if err != nil {
errors = append(errors, err)
m.Logger().Errorf("error in mapping: %s", err)
r.Error(err)
continue
}
if !r.Event(evt) {
return nil
}
}

if len(errors) > 0 {
r.Error(errors.Err())
}
return nil
}

func eventMapping(exchange map[string]interface{}, r mb.ReporterV2) error {
func eventMapping(exchange map[string]interface{}) (mb.Event, error) {
fields, err := schema.Apply(exchange)
if err != nil {
return err
return mb.Event{}, err
}

rootFields := common.MapStr{}
Expand All @@ -102,6 +99,6 @@ func eventMapping(exchange map[string]interface{}, r mb.ReporterV2) error {
RootFields: rootFields,
ModuleFields: moduleFields,
}
r.Event(event)
return nil
return event, nil

}
9 changes: 5 additions & 4 deletions metricbeat/module/rabbitmq/exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package exchange

import (
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/rabbitmq"
)
Expand Down Expand Up @@ -46,13 +48,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()

if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

eventsMapping(content, r)
return eventsMapping(content, r, m)
}
6 changes: 3 additions & 3 deletions metricbeat/module/rabbitmq/exchange/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestFetchEventContents(t *testing.T) {

reporter := &mbtest.CapturingReporterV2{}

metricSet := mbtest.NewReportingMetricSetV2(t, getConfig(server.URL))
metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL))
metricSet.Fetch(reporter)

e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0])
Expand Down Expand Up @@ -64,8 +64,8 @@ func TestData(t *testing.T) {
server := mtest.Server(t, mtest.DefaultServerConfig)
defer server.Close()

ms := mbtest.NewReportingMetricSetV2(t, getConfig(server.URL))
err := mbtest.WriteEventsReporterV2Cond(ms, t, "", func(e common.MapStr) bool {
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL))
err := mbtest.WriteEventsReporterV2ErrorCond(ms, t, "", func(e common.MapStr) bool {
hasIn, _ := e.HasKey("rabbitmq.exchange.messages.publish_in")
hasOut, _ := e.HasKey("rabbitmq.exchange.messages.publish_out")
return hasIn && hasOut
Expand Down
28 changes: 19 additions & 9 deletions metricbeat/module/rabbitmq/node/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package node
import (
"encoding/json"

"github.com/pkg/errors"

s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -145,26 +147,34 @@ var (
}
)

func eventsMapping(r mb.ReporterV2, content []byte) {
func eventsMapping(r mb.ReporterV2, content []byte, m *ClusterMetricSet) error {
var nodes []map[string]interface{}
err := json.Unmarshal(content, &nodes)
if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error in Unmarshal")
}

for _, node := range nodes {
eventMapping(r, node)
evt, err := eventMapping(node)
if err != nil {
m.Logger().Errorf("error in mapping: %s", err)
r.Error(err)
continue
}
if !r.Event(evt) {
return nil
}
}
return nil
}

func eventMapping(r mb.ReporterV2, node map[string]interface{}) {
func eventMapping(node map[string]interface{}) (mb.Event, error) {
event, err := schema.Apply(node)
if err != nil {
r.Error(err)
return
return mb.Event{}, errors.Wrap(err, "error applying schema")
}
r.Event(mb.Event{
return mb.Event{
MetricSetFields: event,
})
}, nil

}
25 changes: 13 additions & 12 deletions metricbeat/module/rabbitmq/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,35 +89,36 @@ func (m *MetricSet) fetchOverview() (*apiOverview, error) {
}

// Fetch metrics from rabbitmq node
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
o, err := m.fetchOverview()
if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

node, err := rabbitmq.NewMetricSet(m.BaseMetricSet, rabbitmq.NodesPath+"/"+o.Node)
if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error creating new metricset")
}

content, err := node.HTTP.FetchJSON()
if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

eventMapping(r, content)
evt, err := eventMapping(content)
if err != nil {
return errors.Wrap(err, "error in mapping")
}
r.Event(evt)
return nil
}

// Fetch metrics from all rabbitmq nodes in the cluster
func (m *ClusterMetricSet) Fetch(r mb.ReporterV2) {
func (m *ClusterMetricSet) Fetch(r mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()
if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

eventsMapping(r, content)
return eventsMapping(r, content, m)
}
6 changes: 3 additions & 3 deletions metricbeat/module/rabbitmq/node/node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

func TestData(t *testing.T) {
ms := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(ms, t, "")
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
err := mbtest.WriteEventsReporterV2Error(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand All @@ -41,7 +41,7 @@ func TestFetch(t *testing.T) {

reporter := &mbtest.CapturingReporterV2{}

metricSet := mbtest.NewReportingMetricSetV2(t, getConfig())
metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig())
metricSet.Fetch(reporter)

e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0])
Expand Down
Loading

0 comments on commit b8d10dc

Please sign in to comment.