Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] Migrate Aerospike namespace Metricset to use ReporterV2 interface #10984

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions metricbeat/module/aerospike/namespace/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
},
"name": "test",
"node": {
"host": "172.18.0.4:3000",
"name": "BB9040012AC4202"
"host": "172.26.0.2:3000",
"name": "BB902001AAC4202"
},
"objects": {
"master": 0,
Expand All @@ -67,14 +67,16 @@
"stop_writes": false
}
},
"beat": {
"hostname": "host.example.com",
"name": "host.example.com"
"event": {
"dataset": "aerospike.namespace",
"duration": 115000,
"module": "aerospike"
},
"metricset": {
"host": "aerospike:3000",
"module": "aerospike",
"name": "namespace",
"rtt": 115
"name": "namespace"
},
"service": {
"address": "172.26.0.2:3000",
"type": "aerospike"
}
}
21 changes: 9 additions & 12 deletions metricbeat/module/aerospike/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/aerospike"
)
Expand Down Expand Up @@ -67,27 +66,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
var events []common.MapStr

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
if err := m.connect(); err != nil {
return nil, err
return errors.Wrap(err, "error connecting to Aerospike")
}

for _, node := range m.client.GetNodes() {
info, err := as.RequestNodeInfo(node, "namespaces")
if err != nil {
logp.Err("Failed to retrieve namespaces from node %s", node.GetName())
m.Logger().Error("Failed to retrieve namespaces from node %s", node.GetName())
continue
}

for _, namespace := range strings.Split(info["namespaces"], ";") {
info, err := as.RequestNodeInfo(node, "namespace/"+namespace)
if err != nil {
logp.Err("Failed to retrieve metrics for namespace %s from node %s", namespace, node.GetName())
m.Logger().Error("Failed to retrieve metrics for namespace %s from node %s", namespace, node.GetName())
continue
}

Expand All @@ -98,11 +95,11 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {
"name": node.GetName(),
}

events = append(events, data)
reporter.Event(mb.Event{MetricSetFields: data})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could interrupt this loop to avoid further queries if reporter.Event() is returning false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a good idea? I mean, let's face some examples:

  • Sending 100 events and 1 of them failed because of a network issue (for the purpose of the example). 99 events would arrive.
  • 1 event failed so I don't send the next 99.
  • 100 event fails and we have 100 error messages. While they are not harmful, it is a bit annoying to receive 100 times the same error message. 100 events didn't arrive, of course.
  • A mix, where we only send some of the events and some not mixed with some error messages.

So, my assumption after you comment is that we prefer to get those 99 events and produce an error message in each case.

WDYT? 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is that it doesn't return false because of a network issue, it returns false in a totally deterministic way if the metricset has been stopped, and in that case all the following calls will also fail. Following with your example, if you have 100 namespaces, and the second one fails to be reported, the other 97 will also fail, so it does 97 requests for namespace metrics that are not going to be reported.

Remember this thread.

In any case I think this is not such a big deal, so as you prefer, but I find the possibility of handling this as an advantage of using reporter V2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I completely forgot about that thread. I just wanted to be sure that we were taking into account all possible scenarios. I'll modify it. Thanks Jaime 🙂

}
}

return events, nil
return nil
}

// create an aerospike client if it doesn't exist yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,32 @@ package namespace
import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/tests/compose"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/elastic/beats/metricbeat/module/aerospike"
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}

func TestFetch(t *testing.T) {
compose.EnsureUp(t, "aerospike")

f := mbtest.NewEventsFetcher(t, getConfig())
err := mbtest.WriteEvents(f, t)
if err != nil {
t.Fatal("write", err)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)
event := events[0].MetricSetFields

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)
}

func getConfig() map[string]interface{} {
Expand Down