From 49c8d87cf408992d9b18a30035c71657615d70ea Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Mon, 14 Dec 2020 16:22:11 +0100 Subject: [PATCH] [Ingest Management] Agent expose metrics (#22793) * [Ingest Manager] Log level reloadable from fleet (#22690) [Ingest Manager] Log level reloadable from fleet (#22690) * aa * create drop * updated drop * process contains everything * drop start time * undo exposed endpoint * sanitize dataset name * ups * agent expose http * collect all metrics from beats * colelct all from beats * golint * cleaner docs * updated structure * cgroup * long live file saving issues --- libbeat/cmd/instance/beat.go | 5 +- libbeat/cmd/instance/{ => metrics}/metrics.go | 5 +- .../instance/{ => metrics}/metrics_common.go | 7 +- .../{ => metrics}/metrics_file_descriptors.go | 2 +- .../metrics_file_descriptors_stub.go | 2 +- .../instance/{ => metrics}/metrics_handles.go | 2 +- .../{ => metrics}/metrics_handles_stub.go | 2 +- .../instance/{ => metrics}/metrics_other.go | 4 +- x-pack/elastic-agent/pkg/agent/cmd/run.go | 99 ++++++++++- .../pkg/agent/operation/monitoring.go | 167 ++++++++++++++++++ .../pkg/core/monitoring/beats/monitoring.go | 18 ++ 11 files changed, 301 insertions(+), 12 deletions(-) rename libbeat/cmd/instance/{ => metrics}/metrics.go (98%) rename libbeat/cmd/instance/{ => metrics}/metrics_common.go (93%) rename libbeat/cmd/instance/{ => metrics}/metrics_file_descriptors.go (99%) rename libbeat/cmd/instance/{ => metrics}/metrics_file_descriptors_stub.go (98%) rename libbeat/cmd/instance/{ => metrics}/metrics_handles.go (99%) rename libbeat/cmd/instance/{ => metrics}/metrics_handles_stub.go (98%) rename libbeat/cmd/instance/{ => metrics}/metrics_other.go (94%) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 5fc32b638f53..09631337c004 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -42,6 +42,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/cloudid" + "github.com/elastic/beats/v7/libbeat/cmd/instance/metrics" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/common/reload" @@ -237,7 +238,7 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool) (*Beat, error) { Name: hostname, Hostname: hostname, ID: id, - EphemeralID: ephemeralID, + EphemeralID: metrics.EphemeralID(), }, Fields: fields, } @@ -316,7 +317,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { reg = monitoring.Default.NewRegistry("libbeat") } - err = setupMetrics(b.Info.Beat) + err = metrics.SetupMetrics(b.Info.Beat) if err != nil { return nil, err } diff --git a/libbeat/cmd/instance/metrics.go b/libbeat/cmd/instance/metrics/metrics.go similarity index 98% rename from libbeat/cmd/instance/metrics.go rename to libbeat/cmd/instance/metrics/metrics.go index 37495edf2f24..a1fd1f567834 100644 --- a/libbeat/cmd/instance/metrics.go +++ b/libbeat/cmd/instance/metrics/metrics.go @@ -17,7 +17,7 @@ // +build darwin,cgo freebsd,cgo linux windows -package instance +package metrics import ( "fmt" @@ -46,7 +46,7 @@ func init() { systemMetrics = monitoring.Default.NewRegistry("system") } -func setupMetrics(name string) error { +func SetupMetrics(name string) error { monitoring.NewFunc(systemMetrics, "cpu", reportSystemCPUUsage, monitoring.Report) //if the beat name is longer than 15 characters, truncate it so we don't fail process checks later on @@ -102,6 +102,7 @@ func reportMemStats(m monitoring.Mode, V monitoring.Visitor) { monitoring.ReportInt(V, "memory_total", int64(stats.TotalAlloc)) if m == monitoring.Full { monitoring.ReportInt(V, "memory_alloc", int64(stats.Alloc)) + monitoring.ReportInt(V, "memory_sys", int64(stats.Sys)) monitoring.ReportInt(V, "gc_next", int64(stats.NextGC)) } diff --git a/libbeat/cmd/instance/metrics_common.go b/libbeat/cmd/instance/metrics/metrics_common.go similarity index 93% rename from libbeat/cmd/instance/metrics_common.go rename to libbeat/cmd/instance/metrics/metrics_common.go index feff54b48411..f9a4a2866c1d 100644 --- a/libbeat/cmd/instance/metrics_common.go +++ b/libbeat/cmd/instance/metrics/metrics_common.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package instance +package metrics import ( "time" @@ -43,6 +43,11 @@ func init() { } } +// EphemeralID returns generated EphemeralID +func EphemeralID() uuid.UUID { + return ephemeralID +} + func reportInfo(_ monitoring.Mode, V monitoring.Visitor) { V.OnRegistryStart() defer V.OnRegistryFinished() diff --git a/libbeat/cmd/instance/metrics_file_descriptors.go b/libbeat/cmd/instance/metrics/metrics_file_descriptors.go similarity index 99% rename from libbeat/cmd/instance/metrics_file_descriptors.go rename to libbeat/cmd/instance/metrics/metrics_file_descriptors.go index 5d255fd483d1..14ce5e1e65de 100644 --- a/libbeat/cmd/instance/metrics_file_descriptors.go +++ b/libbeat/cmd/instance/metrics/metrics_file_descriptors.go @@ -17,7 +17,7 @@ // +build linux freebsd,cgo -package instance +package metrics import ( "fmt" diff --git a/libbeat/cmd/instance/metrics_file_descriptors_stub.go b/libbeat/cmd/instance/metrics/metrics_file_descriptors_stub.go similarity index 98% rename from libbeat/cmd/instance/metrics_file_descriptors_stub.go rename to libbeat/cmd/instance/metrics/metrics_file_descriptors_stub.go index f6665ce29445..b0cc8b2b8d66 100644 --- a/libbeat/cmd/instance/metrics_file_descriptors_stub.go +++ b/libbeat/cmd/instance/metrics/metrics_file_descriptors_stub.go @@ -18,7 +18,7 @@ // +build !linux // +build !freebsd !cgo -package instance +package metrics // FDUsage is only supported on Linux and FreeBSD. func setupLinuxBSDFDMetrics() {} diff --git a/libbeat/cmd/instance/metrics_handles.go b/libbeat/cmd/instance/metrics/metrics_handles.go similarity index 99% rename from libbeat/cmd/instance/metrics_handles.go rename to libbeat/cmd/instance/metrics/metrics_handles.go index 497fe5edcd6e..5de41f7d70b5 100644 --- a/libbeat/cmd/instance/metrics_handles.go +++ b/libbeat/cmd/instance/metrics/metrics_handles.go @@ -17,7 +17,7 @@ // +build windows -package instance +package metrics import ( "github.com/elastic/beats/v7/libbeat/logp" diff --git a/libbeat/cmd/instance/metrics_handles_stub.go b/libbeat/cmd/instance/metrics/metrics_handles_stub.go similarity index 98% rename from libbeat/cmd/instance/metrics_handles_stub.go rename to libbeat/cmd/instance/metrics/metrics_handles_stub.go index 677cc5aaf28b..fa54b75fecda 100644 --- a/libbeat/cmd/instance/metrics_handles_stub.go +++ b/libbeat/cmd/instance/metrics/metrics_handles_stub.go @@ -17,7 +17,7 @@ // +build !windows -package instance +package metrics // Counting number of open handles is only supported on Windows. func setupWindowsHandlesMetrics() {} diff --git a/libbeat/cmd/instance/metrics_other.go b/libbeat/cmd/instance/metrics/metrics_other.go similarity index 94% rename from libbeat/cmd/instance/metrics_other.go rename to libbeat/cmd/instance/metrics/metrics_other.go index 2ea96cd6c8e8..459d2fcbc682 100644 --- a/libbeat/cmd/instance/metrics_other.go +++ b/libbeat/cmd/instance/metrics/metrics_other.go @@ -19,13 +19,13 @@ // +build !freebsd !cgo // +build !linux,!windows -package instance +package metrics import ( "github.com/elastic/beats/v7/libbeat/logp" ) -func setupMetrics(name string) error { +func SetupMetrics(name string) error { logp.Warn("Metrics not implemented for this OS.") return nil } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 541607cab8f8..eb1285855390 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -6,16 +6,23 @@ package cmd import ( "context" + "encoding/json" "fmt" + "net/http" "os" "os/signal" "path/filepath" + "runtime" + "strings" "syscall" "github.com/spf13/cobra" + "github.com/elastic/beats/v7/libbeat/api" + "github.com/elastic/beats/v7/libbeat/cmd/instance/metrics" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/service" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" @@ -28,6 +35,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) @@ -133,6 +141,12 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { // Windows: Mark se return err } + serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS()) + if err != nil { + return err + } + defer serverStopFn() + if err := app.Start(); err != nil { return err } @@ -234,3 +248,86 @@ func defaultLogLevel(cfg *configuration.Configuration) string { return defaultLogLevel } + +func setupMetrics(agentInfo *info.AgentInfo, logger *logger.Logger, operatingSystem string) (func() error, error) { + // use libbeat to setup metrics + if err := metrics.SetupMetrics(agentName); err != nil { + return nil, err + } + + // start server for stats + endpointConfig := api.Config{ + Enabled: true, + Host: beats.AgentMonitoringEndpoint(operatingSystem), + } + + // create agent config path + createAgentMonitoringDrop(endpointConfig.Host) + + cfg, err := common.NewConfigFrom(endpointConfig) + if err != nil { + return nil, err + } + + s, err := exposeMetricsEndpoint(logger, cfg, monitoring.GetNamespace) + if err != nil { + return nil, errors.New(err, "could not start the HTTP server for the API") + } + s.Start() + + // return server stopper + return s.Stop, nil +} + +func createAgentMonitoringDrop(drop string) error { + if drop == "" || runtime.GOOS == "windows" { + return nil + } + + path := strings.TrimPrefix(drop, "unix://") + if strings.HasSuffix(path, ".sock") { + path = filepath.Dir(path) + } + + _, err := os.Stat(path) + if err != nil { + if !os.IsNotExist(err) { + return err + } + + // create + if err := os.MkdirAll(path, 0775); err != nil { + return err + } + } + + return os.Chown(path, os.Geteuid(), os.Getegid()) +} + +func exposeMetricsEndpoint(log *logger.Logger, config *common.Config, ns func(string) *monitoring.Namespace) (*api.Server, error) { + mux := http.NewServeMux() + + makeAPIHandler := func(ns *monitoring.Namespace) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + data := monitoring.CollectStructSnapshot( + ns.GetRegistry(), + monitoring.Full, + false, + ) + + bytes, err := json.Marshal(data) + var content string + if err != nil { + content = fmt.Sprintf("Not valid json: %v", err) + } else { + content = string(bytes) + } + fmt.Fprintf(w, content) + } + } + + mux.HandleFunc("/stats", makeAPIHandler(ns("stats"))) + return api.New(log, mux, config) +} diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index 448ace6bf4f6..a7b835503be9 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/app" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats" ) const ( @@ -24,6 +25,7 @@ const ( logsProcessName = "filebeat" metricsProcessName = "metricbeat" artifactPrefix = "beats" + agentName = "elastic-agent" ) func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { @@ -302,6 +304,8 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string return nil, false } var modules []interface{} + fixedAgentName := strings.ReplaceAll(agentName, "-", "_") + for name, endpoints := range hosts { modules = append(modules, map[string]interface{}{ "module": "beat", @@ -339,8 +343,171 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string }, }, }, + }, map[string]interface{}{ + "module": "http", + "metricsets": []string{"json"}, + "namespace": "agent", + "period": "10s", + "path": "/stats", + "hosts": endpoints, + "index": fmt.Sprintf("metrics-elastic_agent.%s-default", fixedAgentName), + "processors": []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "target": "data_stream", + "fields": map[string]interface{}{ + "type": "metrics", + "dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName), + "namespace": "default", + }, + }, + }, + { + "add_fields": map[string]interface{}{ + "target": "event", + "fields": map[string]interface{}{ + "dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName), + }, + }, + }, + { + "add_fields": map[string]interface{}{ + "target": "elastic_agent", + "fields": map[string]interface{}{ + "id": o.agentInfo.AgentID(), + "version": o.agentInfo.Version(), + "snapshot": o.agentInfo.Snapshot(), + "process": name, + }, + }, + }, + { + "copy_fields": map[string]interface{}{ + "fields": []map[string]interface{}{ + // I should be able to see the CPU Usage on the running machine. Am using too much CPU? + { + "from": "http.agent.beat.cpu", + "to": "system.process.cpu", + }, + // I should be able to see the Memory usage of Elastic Agent. Is the Elastic Agent using too much memory? + { + "from": "http.agent.beat.memstats.memory_sys", + "to": "system.process.memory.size", + }, + // I should be able to see the system memory. Am I running out of memory? + // TODO: with APM agent: total and free + + // I should be able to see Disk usage on the running machine. Am I running out of disk space? + // TODO: with APM agent + + // I should be able to see fd usage. Am I keep too many files open? + { + "from": "http.agent.beat.handles", + "to": "system.process.fd", + }, + // Cgroup reporting + { + "from": "http.agent.beat.cgrgit loup", + "to": "system.process.cgroup", + }, + }, + "ignore_missing": true, + }, + }, + { + "drop_fields": map[string]interface{}{ + "fields": []string{ + "http", + }, + "ignore_missing": true, + }, + }, + }, }) } + + modules = append(modules, map[string]interface{}{ + "module": "http", + "metricsets": []string{"json"}, + "namespace": "agent", + "period": "10s", + "path": "/stats", + "hosts": []string{beats.AgentPrefixedMonitoringEndpoint(o.config.DownloadConfig.OS())}, + "index": fmt.Sprintf("metrics-elastic_agent.%s-default", fixedAgentName), + "processors": []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "target": "data_stream", + "fields": map[string]interface{}{ + "type": "metrics", + "dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName), + "namespace": "default", + }, + }, + }, + { + "add_fields": map[string]interface{}{ + "target": "event", + "fields": map[string]interface{}{ + "dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName), + }, + }, + }, + { + "add_fields": map[string]interface{}{ + "target": "elastic_agent", + "fields": map[string]interface{}{ + "id": o.agentInfo.AgentID(), + "version": o.agentInfo.Version(), + "snapshot": o.agentInfo.Snapshot(), + "process": "elastic-agent", + }, + }, + }, + { + "copy_fields": map[string]interface{}{ + "fields": []map[string]interface{}{ + // I should be able to see the CPU Usage on the running machine. Am using too much CPU? + { + "from": "http.agent.beat.cpu", + "to": "system.process.cpu", + }, + // I should be able to see the Memory usage of Elastic Agent. Is the Elastic Agent using too much memory? + { + "from": "http.agent.beat.memstats.memory_sys", + "to": "system.process.memory.size", + }, + // I should be able to see the system memory. Am I running out of memory? + // TODO: with APM agent: total and free + + // I should be able to see Disk usage on the running machine. Am I running out of disk space? + // TODO: with APM agent + + // I should be able to see fd usage. Am I keep too many files open? + { + "from": "http.agent.beat.handles", + "to": "system.process.fd", + }, + // Cgroup reporting + { + "from": "http.agent.beat.cgroup", + "to": "system.process.cgroup", + }, + }, + "ignore_missing": true, + }, + }, + { + "drop_fields": map[string]interface{}{ + "fields": []string{ + "http", + }, + "ignore_missing": true, + }, + }, + }, + }) + result := map[string]interface{}{ "metricbeat": map[string]interface{}{ "modules": modules, diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go index 203a101fd830..3ece027a7aae 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go @@ -21,6 +21,11 @@ const ( mbEndpointFileFormat = "unix:///tmp/elastic-agent/%s/%s/%s.sock" // args: pipeline name, application name mbEndpointFileFormatWin = `npipe:///%s-%s` + + // args: pipeline name, application name + agentMbEndpointFileFormat = "unix:///tmp/elastic-agent/elastic-agent.sock" + // args: pipeline name, application name + agentMbEndpointFileFormatWin = `npipe:///elastic-agent` ) func getMonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) string { @@ -42,3 +47,16 @@ func getLoggingFile(spec program.Spec, operatingSystem, installPath, pipelineID } return fmt.Sprintf(logFileFormat, paths.Home(), pipelineID, spec.Cmd) } + +// AgentMonitoringEndpoint returns endpoint with exposed metrics for agent. +func AgentMonitoringEndpoint(operatingSystem string) string { + if operatingSystem == "windows" { + return agentMbEndpointFileFormatWin + } + return agentMbEndpointFileFormat +} + +// AgentPrefixedMonitoringEndpoint returns endpoint with exposed metrics for agent. +func AgentPrefixedMonitoringEndpoint(operatingSystem string) string { + return httpPlusPrefix + AgentMonitoringEndpoint(operatingSystem) +}