diff --git a/internal/collector/telemetry/process_telemetry.go b/internal/collector/telemetry/process_telemetry.go index fe0240640b1..89d763f3ee9 100644 --- a/internal/collector/telemetry/process_telemetry.go +++ b/internal/collector/telemetry/process_telemetry.go @@ -20,17 +20,30 @@ import ( "runtime" "time" - "github.com/prometheus/procfs" + "github.com/shirou/gopsutil/process" "go.opencensus.io/stats" "go.opencensus.io/stats/view" ) // ProcessMetricsViews is a struct that contains views related to process metrics (cpu, mem, etc) type ProcessMetricsViews struct { - ballastSizeBytes uint64 - views []*view.View - done chan struct{} - proc *procfs.Proc + startTimeUnixNano int64 + ballastSizeBytes uint64 + views []*view.View + done chan struct{} + proc *process.Process +} + +var mUptime = stats.Int64( + "process/uptime", + "Uptime of the process", + stats.UnitSeconds) +var viewProcessUptime = &view.View{ + Name: mUptime.Name(), + Description: mUptime.Description(), + Measure: mUptime, + Aggregation: view.LastValue(), + TagKeys: nil, } var mRuntimeAllocMem = stats.Int64( @@ -81,23 +94,37 @@ var viewCPUSeconds = &view.View{ TagKeys: nil, } +var mRSSMemory = stats.Int64( + "process/memory/rss", + "Total physical memory (resident set size)", + stats.UnitDimensionless) +var viewRSSMemory = &view.View{ + Name: mRSSMemory.Name(), + Description: mRSSMemory.Description(), + Measure: mRSSMemory, + Aggregation: view.LastValue(), + TagKeys: nil, +} + // NewProcessMetricsViews creates a new set of ProcessMetrics (mem, cpu) that can be used to measure // basic information about this process. -func NewProcessMetricsViews(ballastSizeBytes uint64) *ProcessMetricsViews { +func NewProcessMetricsViews(ballastSizeBytes uint64) (*ProcessMetricsViews, error) { pmv := &ProcessMetricsViews{ - ballastSizeBytes: ballastSizeBytes, - views: []*view.View{viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds}, - done: make(chan struct{}), + startTimeUnixNano: time.Now().UnixNano(), + ballastSizeBytes: ballastSizeBytes, + views: []*view.View{viewProcessUptime, viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds, viewRSSMemory}, + done: make(chan struct{}), } - // procfs.Proc is not available on windows and expected to fail. pid := os.Getpid() - proc, err := procfs.NewProc(pid) - if err == nil { - pmv.proc = &proc + + var err error + pmv.proc, err = process.NewProcess(int32(pid)) + if err != nil { + return nil, err } - return pmv + return pmv, nil } // StartCollection starts a ticker'd goroutine that will update the PMV measurements every 5 seconds @@ -128,14 +155,18 @@ func (pmv *ProcessMetricsViews) StopCollection() { func (pmv *ProcessMetricsViews) updateViews() { ms := &runtime.MemStats{} + stats.Record(context.Background(), mUptime.M(time.Now().UnixNano()-pmv.startTimeUnixNano)) pmv.readMemStats(ms) stats.Record(context.Background(), mRuntimeAllocMem.M(int64(ms.Alloc))) stats.Record(context.Background(), mRuntimeTotalAllocMem.M(int64(ms.TotalAlloc))) stats.Record(context.Background(), mRuntimeSysMem.M(int64(ms.Sys))) if pmv.proc != nil { - if procStat, err := pmv.proc.Stat(); err == nil { - stats.Record(context.Background(), mCPUSeconds.M(int64(procStat.CPUTime()))) + if times, err := pmv.proc.Times(); err == nil { + stats.Record(context.Background(), mCPUSeconds.M(int64(times.Total()))) + } + if mem, err := pmv.proc.MemoryInfo(); err == nil { + stats.Record(context.Background(), mRSSMemory.M(int64(mem.RSS))) } } } diff --git a/internal/collector/telemetry/process_telemetry_test.go b/internal/collector/telemetry/process_telemetry_test.go index 97b15b91640..1bc1bf45588 100644 --- a/internal/collector/telemetry/process_telemetry_test.go +++ b/internal/collector/telemetry/process_telemetry_test.go @@ -27,17 +27,20 @@ import ( func TestProcessTelemetry(t *testing.T) { const ballastSizeBytes uint64 = 0 - pmv := NewProcessMetricsViews(ballastSizeBytes) + pmv, err := NewProcessMetricsViews(ballastSizeBytes) + require.NoError(t, err) assert.NotNil(t, pmv) expectedViews := []string{ // Changing a metric name is a breaking change. // Adding new metrics is ok as long it follows the conventions described at // https://pkg.go.dev/go.opentelemetry.io/collector/obsreport?tab=doc#hdr-Naming_Convention_for_New_Metrics + "process/uptime", "process/runtime/heap_alloc_bytes", "process/runtime/total_alloc_bytes", "process/runtime/total_sys_memory_bytes", "process/cpu_seconds", + "process/memory/rss", } processViews := pmv.Views() assert.Len(t, processViews, len(expectedViews)) diff --git a/service/telemetry.go b/service/telemetry.go index 6678102744b..c6df9923cc6 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -60,13 +60,17 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u return nil } + processMetricsViews, err := telemetry.NewProcessMetricsViews(ballastSizeBytes) + if err != nil { + return err + } + var views []*view.View views = append(views, obsreport.Configure(telemetry.UseLegacyMetrics(), telemetry.UseNewMetrics())...) views = append(views, processor.MetricViews(level)...) views = append(views, queuedprocessor.MetricViews(level)...) views = append(views, batchprocessor.MetricViews(level)...) views = append(views, tailsamplingprocessor.SamplingProcessorMetricViews(level)...) - processMetricsViews := telemetry.NewProcessMetricsViews(ballastSizeBytes) views = append(views, processMetricsViews.Views()...) views = append(views, fluentobserv.Views(level)...) tel.views = views