Skip to content

Commit

Permalink
Add new MetricSet interfaces for Module Developers (elastic#3908)
Browse files Browse the repository at this point in the history
This PR adds new interfaces for MetricSet developers. These interfaces provided additional flexibility for MetricSet implementations. The new interfaces are:

- `Closer` - With Metricbeat gaining the ability to dynamically load and unload modules there became a need to properly release resources opened by a MetricSet. If a MetricSet implements the `Closer` interface then the `Close() error` method will be invoked when the MetricSet is unloaded.

- `ReportingFetcher` - Some MetricSets needed to report multiple errors, but the existing `Fetch` methods only allowed a single error to be returned. `ReportingFetcher` passes a callback interface to the MetricSet that allows it to report any combination of events, errors, or errors with metadata.

- `PushMetricSet` - This is for push event sources. All the current MetricSet implementations are driven by periodic `Fetch` callbacks triggered by timer. A `PushMetricSet` does not receive periodic callbacks, but instead has a `Run` method. Implementations can forward events as they are received from the underlying source.
  • Loading branch information
andrewkroh authored and ruflin committed Apr 7, 2017
1 parent 2ce3add commit f3079d9
Show file tree
Hide file tree
Showing 11 changed files with 504 additions and 120 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Add experimental metricset `perfmon` to Windows module. {pull}3758[3758]
- Add memcached module with stats metricset. {pull}3693[3693]
- Add the `process.cmdline.cache.enabled` config option to the System Process Metricset. {pull}3891[3891]
- Add new MetricSet interfaces for developers (`Closer`, `ReportingFetcher`, and `PushMetricSet`). {pull}3908[3908]

*Packetbeat*
- Add `fields` and `fields_under_root` to packetbeat protocols configurations. {pull}3518[3518]
Expand Down
16 changes: 13 additions & 3 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,25 @@ func mustImplementFetcher(ms MetricSet) error {
ifcs = append(ifcs, "EventsFetcher")
}

if _, ok := ms.(ReportingMetricSet); ok {
ifcs = append(ifcs, "ReportingMetricSet")
}

if _, ok := ms.(PushMetricSet); ok {
ifcs = append(ifcs, "PushMetricSet")
}

switch len(ifcs) {
case 0:
return fmt.Errorf("MetricSet '%s/%s' does not implement a Fetcher "+
"interface", ms.Module().Name(), ms.Name())
return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
"producing interface (EventFetcher, EventsFetcher, "+
"ReportingMetricSet, or PushMetricSet)",
ms.Module().Name(), ms.Name())
case 1:
return nil
default:
return fmt.Errorf("MetricSet '%s/%s' can only implement a single "+
"Fetcher interface, but implements %v", ms.Module().Name(),
"event producing interface, but implements %v", ms.Module().Name(),
ms.Name(), ifcs)
}
}
Expand Down
31 changes: 21 additions & 10 deletions metricbeat/mb/example_metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"github.com/elastic/beats/metricbeat/mb/parse"
)

var hostParser = parse.URLHostParserBuilder{DefaultScheme: "http"}.Build()
var hostParser = parse.URLHostParserBuilder{
DefaultScheme: "http",
}.Build()

func init() {
// Register the MetricSetFactory function for the "status" MetricSet.
Expand All @@ -26,14 +28,23 @@ func NewMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
}

func (ms *MetricSet) Fetch() (common.MapStr, error) {
// Fetch data from the host (using ms.HostData().URI) and return the data.
return common.MapStr{
"someParam": "value",
"otherParam": 42,
}, nil
// Fetch will be called periodically by the framework.
func (ms *MetricSet) Fetch(report mb.Reporter) {
// Fetch data from the host at ms.HostData().URI and return the data.
data, err := common.MapStr{
"some_metric": 18.0,
"answer_to_everything": 42,
}, error(nil)
if err != nil {
// Report an error if it occurs.
report.Error(err)
return
}

// Otherwise report the collected data.
report.Event(data)
}

// ExampleMetricSetFactory demonstrates how to register a MetricSetFactory
// and unpack additional configuration data.
func ExampleMetricSetFactory() {}
// ExampleReportingMetricSet demonstrates how to register a MetricSetFactory
// and implement a ReportingMetricSet.
func ExampleReportingMetricSet() {}
45 changes: 44 additions & 1 deletion metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,62 @@ type MetricSet interface {
HostData() HostData // HostData returns the parsed host data.
}

// Closer is an optional interface that a MetricSet can implement in order to
// cleanup any resources it has open at shutdown.
type Closer interface {
Close() error
}

// EventFetcher is a MetricSet that returns a single event when collecting data.
// Use ReportingMetricSet for new MetricSet implementations.
type EventFetcher interface {
MetricSet
Fetch() (common.MapStr, error)
}

// EventsFetcher is a MetricSet that returns a multiple events when collecting
// data.
// data. Use ReportingMetricSet for new MetricSet implementations.
type EventsFetcher interface {
MetricSet
Fetch() ([]common.MapStr, error)
}

// Reporter is used by a MetricSet to report events, errors, or errors with
// metadata. The methods return false if and only if publishing failed because
// the MetricSet is being closed.
type Reporter interface {
Event(event common.MapStr) bool // Event reports a single successful event.
ErrorWith(err error, meta common.MapStr) bool // ErrorWith reports a single error event with the additional metadata.
Error(err error) bool // Error reports a single error event.
}

// ReportingMetricSet is a MetricSet that reports events or errors through the
// Reporter interface. Fetch is called periodically to collect events.
type ReportingMetricSet interface {
MetricSet
Fetch(r Reporter)
}

// PushReporter is used by a MetricSet to report events, errors, or errors with
// metadata. It provides a done channel used to signal that reporter should
// stop.
type PushReporter interface {
Reporter

// Done returns a channel that's closed when work done on behalf of this
// reporter should be canceled.
Done() <-chan struct{}
}

// PushMetricSet is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
// the PushReporter's done channel is closed.
type PushMetricSet interface {
MetricSet
Run(r PushReporter)
}

// HostData contains values parsed from the 'host' configuration. Other
// configuration data like protocols, usernames, and passwords may also be
// used to construct this HostData data.
Expand Down
155 changes: 154 additions & 1 deletion metricbeat/mb/mb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func (m testModule) ParseHost(host string) (HostData, error) {
return m.hostParser(host)
}

// EventFetcher

type testMetricSet struct {
BaseMetricSet
}
Expand All @@ -28,6 +30,32 @@ func (m *testMetricSet) Fetch() (common.MapStr, error) {
return nil, nil
}

// EventsFetcher

type testMetricSetEventsFetcher struct {
BaseMetricSet
}

func (m *testMetricSetEventsFetcher) Fetch() ([]common.MapStr, error) {
return nil, nil
}

// ReportingFetcher

type testMetricSetReportingFetcher struct {
BaseMetricSet
}

func (m *testMetricSetReportingFetcher) Fetch(r Reporter) {}

// PushMetricSet

type testPushMetricSet struct {
BaseMetricSet
}

func (m *testPushMetricSet) Run(r PushReporter) {}

func TestModuleConfig(t *testing.T) {
tests := []struct {
in interface{}
Expand Down Expand Up @@ -169,7 +197,7 @@ func TestNewModulesDuplicateHosts(t *testing.T) {
assert.Error(t, err)
}

func TestNewModules(t *testing.T) {
func TestNewModulesHostParser(t *testing.T) {
const (
name = "HostParser"
host = "example.com"
Expand Down Expand Up @@ -235,6 +263,131 @@ func TestNewModules(t *testing.T) {
}
assert.FailNow(t, "no modules found")
})

}

func TestNewModulesMetricSetTypes(t *testing.T) {
r := newTestRegistry(t)

factory := func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSet{base}, nil
}

name := "EventFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})

modules, err := NewModules(c, r)
if err != nil {
t.Fatal(err)
}
assert.Len(t, modules, 1)

for _, metricSets := range modules {
if assert.Len(t, metricSets, 1) {
metricSet := metricSets[0]
_, ok := metricSet.(EventFetcher)
assert.True(t, ok, name+" not implemented")
}
}
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetEventsFetcher{base}, nil
}

name = "EventsFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})

modules, err := NewModules(c, r)
if err != nil {
t.Fatal(err)
}
assert.Len(t, modules, 1)

for _, metricSets := range modules {
if assert.Len(t, metricSets, 1) {
metricSet := metricSets[0]
_, ok := metricSet.(EventsFetcher)
assert.True(t, ok, name+" not implemented")
}
}
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetReportingFetcher{base}, nil
}

name = "ReportingFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})

modules, err := NewModules(c, r)
if err != nil {
t.Fatal(err)
}
assert.Len(t, modules, 1)

for _, metricSets := range modules {
if assert.Len(t, metricSets, 1) {
metricSet := metricSets[0]
_, ok := metricSet.(ReportingMetricSet)
assert.True(t, ok, name+" not implemented")
}
}
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testPushMetricSet{base}, nil
}

name = "Push"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})

modules, err := NewModules(c, r)
if err != nil {
t.Fatal(err)
}
assert.Len(t, modules, 1)

for _, metricSets := range modules {
if assert.Len(t, metricSets, 1) {
metricSet := metricSets[0]
_, ok := metricSet.(PushMetricSet)
assert.True(t, ok, name+" not implemented")
}
}
})
}

// TestNewBaseModuleFromModuleConfigStruct tests the creation a new BaseModule.
Expand Down
4 changes: 3 additions & 1 deletion metricbeat/mb/module/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ func (b EventBuilder) Build() (common.MapStr, error) {
metricsetData := common.MapStr{
"module": b.ModuleName,
"name": b.MetricSetName,
"rtt": b.FetchDuration.Nanoseconds() / int64(time.Microsecond),
}
if b.FetchDuration != 0 {
metricsetData["rtt"] = b.FetchDuration.Nanoseconds() / int64(time.Microsecond)
}

namespace := b.MetricSetName
Expand Down
14 changes: 14 additions & 0 deletions metricbeat/mb/module/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,17 @@ func TestEventBuilderNoHost(t *testing.T) {
_, found := event["metricset-host"]
assert.False(t, found)
}

func TestEventBuildNoRTT(t *testing.T) {
b := builder
b.FetchDuration = 0

event, err := b.Build()
if err != nil {
t.Fatal(err)
}

metricset := event["metricset"].(common.MapStr)
_, found := metricset["rtt"]
assert.False(t, found, "found rtt")
}
8 changes: 4 additions & 4 deletions metricbeat/mb/module/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func ExampleWrapper() {
// Build a configuration object.
config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{metricSetName},
"metricsets": []string{eventFetcherName},
})
if err != nil {
fmt.Println("Error:", err)
Expand Down Expand Up @@ -68,13 +68,13 @@ func ExampleWrapper() {
// "Tags": null
// },
// "fake": {
// "status": {
// "eventfetcher": {
// "metric": 1
// }
// },
// "metricset": {
// "module": "fake",
// "name": "status",
// "name": "eventfetcher",
// "rtt": 111
// },
// "type": "metricsets"
Expand All @@ -91,7 +91,7 @@ func ExampleRunner() {

config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{metricSetName},
"metricsets": []string{eventFetcherName},
})
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/mb/module/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestRunner(t *testing.T) {

config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{metricSetName},
"metricsets": []string{eventFetcherName},
})
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit f3079d9

Please sign in to comment.