diff --git a/metricbeat/module/system/process/config.go b/metricbeat/module/system/process/config.go index 12266700538b..a28db878565c 100644 --- a/metricbeat/module/system/process/config.go +++ b/metricbeat/module/system/process/config.go @@ -32,6 +32,8 @@ type Config struct { IncludeCPUTicks bool `config:"process.include_cpu_ticks"` IncludePerCPU bool `config:"process.include_per_cpu"` CPUTicks *bool `config:"cpu_ticks"` // Deprecated + // Pid, if set, will override the `processes` config, and only monitor a single process. + Pid int `config:"process.pid"` } // Validate checks for depricated config options diff --git a/metricbeat/module/system/process/process.go b/metricbeat/module/system/process/process.go index b67ce137bd9b..ad9fa8d5ac06 100644 --- a/metricbeat/module/system/process/process.go +++ b/metricbeat/module/system/process/process.go @@ -45,8 +45,8 @@ func init() { type MetricSet struct { mb.BaseMetricSet stats *process.Stats - cgroup *cgroup.Reader perCPU bool + setpid int } // New creates and returns a new MetricSet. @@ -65,6 +65,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } } + if config.Pid != 0 && config.Procs[0] != ".*" { + logp.L().Warnf("`process.pid` set to %d, but `processes` is set to a non-default value. Metricset will only report metrics for pid %d", config.Pid, config.Pid) + } + m := &MetricSet{ BaseMetricSet: base, stats: &process.Stats{ @@ -83,6 +87,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { perCPU: config.IncludePerCPU, } + m.setpid = config.Pid + // If hostfs is set, we may not want to force the hierarchy override, as the user could be expecting a custom path. if !sys.IsSet() { override, isset := os.LookupEnv("LIBBEAT_MONITORING_CGROUPS_HIERARCHY_OVERRIDE") @@ -101,19 +107,32 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch fetches metrics for all processes. It iterates over each PID and // collects process metadata, CPU metrics, and memory metrics. func (m *MetricSet) Fetch(r mb.ReporterV2) error { - procs, roots, err := m.stats.Get() - if err != nil { - return fmt.Errorf("process stats: %w", err) - } - for evtI := range procs { - isOpen := r.Event(mb.Event{ - MetricSetFields: procs[evtI], - RootFields: roots[evtI], - }) - if !isOpen { - return nil + // monitor either a single PID, or the configured set of processes. + if m.setpid == 0 { + procs, roots, err := m.stats.Get() + if err != nil { + return fmt.Errorf("process stats: %w", err) + } + + for evtI := range procs { + isOpen := r.Event(mb.Event{ + MetricSetFields: procs[evtI], + RootFields: roots[evtI], + }) + if !isOpen { + return nil + } } + } else { + proc, root, err := m.stats.GetOneRootEvent(m.setpid) + if err != nil { + return fmt.Errorf("error fetching pid %d: %w", m.setpid, err) + } + r.Event(mb.Event{ + MetricSetFields: proc, + RootFields: root, + }) } return nil diff --git a/metricbeat/module/system/process/process_test.go b/metricbeat/module/system/process/process_test.go index 5c1f59f7a3cb..98b48b75d6ea 100644 --- a/metricbeat/module/system/process/process_test.go +++ b/metricbeat/module/system/process/process_test.go @@ -20,6 +20,7 @@ package process import ( + "os" "testing" "time" @@ -50,6 +51,20 @@ func TestFetch(t *testing.T) { events[0].BeatEvent("system", "process").Fields.StringToPrint()) } +func TestFetchSinglePid(t *testing.T) { + logp.DevelopmentSetup() + + cfg := getConfig() + cfg["process.pid"] = os.Getpid() + + f := mbtest.NewReportingMetricSetV2Error(t, cfg) + events, errs := mbtest.ReportingFetchV2Error(f) + assert.Empty(t, errs) + assert.NotEmpty(t, events) + assert.Equal(t, os.Getpid(), events[0].RootFields["process"].(map[string]interface{})["pid"]) + assert.NotEmpty(t, events[0].MetricSetFields["cpu"]) +} + func TestData(t *testing.T) { f := mbtest.NewReportingMetricSetV2Error(t, getConfig())