Skip to content

Commit

Permalink
Make it possible to have metricset specific metrics (elastic#6836)
Browse files Browse the repository at this point in the history
So far for metricsets and modules only combined metrics were collected. There was no visibility into a single metricset. This PR provides the base functionality to use have each metricset provide it's own metrics.

The BaseMetricSet was extended with an `ID()` and `Metrics()` method. The id is required to have a unique identifier for each metricset during reporting and the `Metrics()` method can be used inside each Mmetricsetetricset to access the metricset specific registry.

Currently the data collected from all metricsets are exposed under `/dataset`. This is probably going to change and is mainly to show case the first implementation. The data exposed looks as following:

```
{
  "05d4bd84-2ca2-4d9d-862d-c7822b4389f5": {
    "id": "05d4bd84-2ca2-4d9d-862d-c7822b4389f5",
    "metricset": "process_summary",
    "module": "system",
    "starttime": "2018-04-17 15:41:34.154435426 +0200 CEST m=+10.037457766"
  },
  "2f85ed18-343b-4c33-865c-44e624cdaf6d": {
    "id": "2f85ed18-343b-4c33-865c-44e624cdaf6d",
    "metricset": "load",
    "module": "system",
    "starttime": "2018-04-17 15:41:34.15164335 +0200 CEST m=+10.034665690"
  }
}
```

The initial idea was to expose an array instead of map with unique keys. But having a map with unique keys makes it easier to add and remove registries when a metricset runner is started / stopped and the functionality already existed. In case the above metrics were reported to Elasticsearch, each block would be one event. To automate this conversion, we could use a flag on a registry to potential have a different reporting for Elasticsearch.

The concept of namespaces was added to manage the existing registries instead of just having on global (`Default`). This can be used in the future for additional API endpoints.
  • Loading branch information
ruflin authored and andrewkroh committed May 4, 2018
1 parent fb84ad3 commit bc4b07c
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 16 deletions.
26 changes: 14 additions & 12 deletions libbeat/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"net/url"
"strconv"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
)

// Start starts the metrics api endpoint on the configured host and port
func Start(cfg *common.Config, info beat.Info) {
func Start(cfg *common.Config) {
cfgwarn.Experimental("Metrics endpoint is enabled.")
config := DefaultConfig
cfg.Unpack(&config)
Expand All @@ -24,8 +23,9 @@ func Start(cfg *common.Config, info beat.Info) {
mux := http.NewServeMux()

// register handlers
mux.HandleFunc("/", rootHandler(info))
mux.HandleFunc("/", rootHandler())
mux.HandleFunc("/stats", statsHandler)
mux.HandleFunc("/dataset", datasetHandler)

url := config.Host + ":" + strconv.Itoa(config.Port)
logp.Info("Metrics endpoint listening on: %s", url)
Expand All @@ -34,7 +34,7 @@ func Start(cfg *common.Config, info beat.Info) {
}()
}

func rootHandler(info beat.Info) func(http.ResponseWriter, *http.Request) {
func rootHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
// Return error page
if r.URL.Path != "/" {
Expand All @@ -44,13 +44,7 @@ func rootHandler(info beat.Info) func(http.ResponseWriter, *http.Request) {

w.Header().Set("Content-Type", "application/json; charset=utf-8")

data := common.MapStr{
"version": info.Version,
"beat": info.Beat,
"name": info.Name,
"uuid": info.UUID,
"hostname": info.Hostname,
}
data := monitoring.CollectStructSnapshot(monitoring.GetNamespace("state").GetRegistry(), monitoring.Full, false)

print(w, data, r.URL)
}
Expand All @@ -60,7 +54,15 @@ func rootHandler(info beat.Info) func(http.ResponseWriter, *http.Request) {
func statsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

data := monitoring.CollectStructSnapshot(nil, monitoring.Full, false)
data := monitoring.CollectStructSnapshot(monitoring.GetNamespace("stats").GetRegistry(), monitoring.Full, false)

print(w, data, r.URL)
}

func datasetHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

data := monitoring.CollectStructSnapshot(monitoring.GetNamespace("dataset").GetRegistry(), monitoring.Full, false)

print(w, data, r.URL)
}
Expand Down
11 changes: 10 additions & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ func Run(name, idxPrefix, version string, bt beat.Creator) error {
if err != nil {
return err
}

registry := monitoring.NewRegistry()
monitoring.GetNamespace("state").SetRegistry(registry)
monitoring.NewString(registry, "version").Set(b.Info.Version)
monitoring.NewString(registry, "beat").Set(b.Info.Beat)
monitoring.NewString(registry, "name").Set(b.Info.Name)
monitoring.NewString(registry, "uuid").Set(b.Info.UUID.String())
monitoring.NewString(registry, "hostname").Set(b.Info.Hostname)

return b.launch(bt)
}())
}
Expand Down Expand Up @@ -315,7 +324,7 @@ func (b *Beat) launch(bt beat.Creator) error {
logp.Info("%s start running.", b.Info.Beat)

if b.Config.HTTP.Enabled() {
api.Start(b.Config.HTTP, b.Info)
api.Start(b.Config.HTTP)
}

return beater.Run(&b.Beat)
Expand Down
4 changes: 4 additions & 0 deletions libbeat/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ const (
// Default is the global default metrics registry provided by the monitoring package.
var Default = NewRegistry()

func init() {
GetNamespace("stats").SetRegistry(Default)
}

var errNotFound = errors.New("Name unknown")
var errInvalidName = errors.New("Name does not point to a valid variable")

Expand Down
39 changes: 39 additions & 0 deletions libbeat/monitoring/namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package monitoring

var namespaces = map[string]*Namespace{}

// Namespace contains the name of the namespace and it's registry
type Namespace struct {
name string
registry *Registry
}

func newNamespace(name string) *Namespace {
n := &Namespace{
name: name,
}
namespaces[name] = n
return n
}

// GetNamespace gets the namespace with the given name.
// If the namespace does not exist yet, a new one is created.
func GetNamespace(name string) *Namespace {
if n, ok := namespaces[name]; ok {
return n
}
return newNamespace(name)
}

// SetRegistry sets the registry of the namespace
func (n *Namespace) SetRegistry(r *Registry) {
n.registry = r
}

// GetRegistry gets the registry of the namespace
func (n *Namespace) GetRegistry() *Registry {
if n.registry == nil {
n.registry = NewRegistry()
}
return n.registry
}
19 changes: 16 additions & 3 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (

"github.com/joeshaw/multierror"
"github.com/pkg/errors"
"github.com/satori/go.uuid"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/monitoring"
)

var (
Expand Down Expand Up @@ -156,10 +158,21 @@ func newBaseMetricSets(r *Register, m Module) ([]BaseMetricSet, error) {
for _, name := range metricSetNames {
name = strings.ToLower(name)
for _, host := range hosts {
id := uuid.NewV4().String()
metrics := monitoring.NewRegistry()
monitoring.NewString(metrics, "module").Set(m.Name())
monitoring.NewString(metrics, "metricset").Set(name)
if host != "" {
monitoring.NewString(metrics, "host").Set(host)
}
monitoring.NewString(metrics, "id").Set(id)

metricsets = append(metricsets, BaseMetricSet{
name: name,
module: m,
host: host,
id: id,
name: name,
module: m,
host: host,
metrics: metrics,
})
}
}
Expand Down
15 changes: 15 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/monitoring"
)

const (
Expand Down Expand Up @@ -78,13 +79,15 @@ func (m *BaseModule) UnpackConfig(to interface{}) error {
// addition to this interface, all MetricSets must implement either
// EventFetcher or EventsFetcher (but not both).
type MetricSet interface {
ID() string // Unique ID identifying a running MetricSet.
Name() string // Name returns the name of the MetricSet.
Module() Module // Module returns the parent Module for the MetricSet.
Host() string // Host returns a hostname or other module specific value
// that identifies a specific host or service instance from which to collect
// metrics.
HostData() HostData // HostData returns the parsed host data.
Registration() MetricSetRegistration // Params used in registration.
Metrics() *monitoring.Registry // MetricSet specific metrics
}

// Closer is an optional interface that a MetricSet can implement in order to
Expand Down Expand Up @@ -213,11 +216,13 @@ func (h HostData) GoString() string { return h.String() }
// MetricSet interface requirements, leaving only the Fetch() method to be
// implemented to have a complete MetricSet implementation.
type BaseMetricSet struct {
id string
name string
module Module
host string
hostData HostData
registration MetricSetRegistration
metrics *monitoring.Registry
}

func (b *BaseMetricSet) String() string {
Expand All @@ -231,6 +236,16 @@ func (b *BaseMetricSet) String() string {

func (b *BaseMetricSet) GoString() string { return b.String() }

// ID returns the unique ID of the MetricSet.
func (b *BaseMetricSet) ID() string {
return b.id
}

// Metrics returns the metrics registry.
func (b *BaseMetricSet) Metrics() *monitoring.Registry {
return b.metrics
}

// Name returns the name of the MetricSet. It should not include the name of
// the module.
func (b *BaseMetricSet) Name() string {
Expand Down
8 changes: 8 additions & 0 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,17 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event {
wg.Add(len(mw.metricSets))
for _, msw := range mw.metricSets {
go func(msw *metricSetWrapper) {
metricsPath := msw.ID()
registry := monitoring.GetNamespace("dataset").GetRegistry()

defer registry.Remove(metricsPath)
defer releaseStats(msw.stats)
defer wg.Done()
defer msw.close()

registry.Add(metricsPath, msw.Metrics(), monitoring.Full)
monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time{}.String())

msw.run(done, out)
}(msw)
}
Expand Down

0 comments on commit bc4b07c

Please sign in to comment.