Skip to content

Commit

Permalink
[Metricbeat] Migrate Ceph cluster_disk to use ReporterV2 interface (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sayden authored Mar 7, 2019
1 parent 7ed8570 commit a78c552
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 13 deletions.
18 changes: 14 additions & 4 deletions metricbeat/module/ceph/cluster_disk/cluster_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package cluster_disk

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand Down Expand Up @@ -61,12 +60,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}, nil
}

func (m *MetricSet) Fetch() (common.MapStr, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()
if err != nil {
return err
}

event, err := eventMapping(content)
if err != nil {
return nil, err
return err
}

if reported := reporter.Event(mb.Event{MetricSetFields: event}); !reported {
m.Logger().Debug("error reporting event")
}

return eventMapping(content), nil
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

// +build integration

package cluster_disk

import (
Expand All @@ -26,9 +28,9 @@ import (
)

func TestData(t *testing.T) {
f := mbtest.NewEventFetcher(t, getConfig())
err := mbtest.WriteEvent(f, t)
if err != nil {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())

if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
9 changes: 7 additions & 2 deletions metricbeat/module/ceph/cluster_disk/cluster_disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ func TestFetchEventContents(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewEventFetcher(t, config)
event, err := f.Fetch()
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)
event := events[0].MetricSetFields

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint())

Expand Down
9 changes: 5 additions & 4 deletions metricbeat/module/ceph/cluster_disk/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package cluster_disk
import (
"encoding/json"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type StatsCluster struct {
Expand All @@ -39,11 +40,11 @@ type DfRequest struct {
Output Output `json:"output"`
}

func eventMapping(content []byte) common.MapStr {
func eventMapping(content []byte) (common.MapStr, error) {
var d DfRequest
err := json.Unmarshal(content, &d)
if err != nil {
logp.Err("Error: %+v", err)
return nil, errors.Wrap(err, "could not get DFRequest data")
}

return common.MapStr{
Expand All @@ -56,5 +57,5 @@ func eventMapping(content []byte) common.MapStr {
"available": common.MapStr{
"bytes": d.Output.StatsCluster.TotalAvailBytes,
},
}
}, nil
}

0 comments on commit a78c552

Please sign in to comment.