From 91d2f3d3f13e83cb428403baec99e10a10014a2f Mon Sep 17 00:00:00 2001 From: James Bebbington Date: Fri, 14 Aug 2020 19:45:21 +1000 Subject: [PATCH] Add uptime & rss self-observability metrics, and fix cpu time to work on non-Linux OSs --- .../collector/telemetry/process_telemetry.go | 56 +++++++++++++++---- .../telemetry/process_telemetry_test.go | 26 +++++---- service/telemetry.go | 6 +- 3 files changed, 64 insertions(+), 24 deletions(-) diff --git a/internal/collector/telemetry/process_telemetry.go b/internal/collector/telemetry/process_telemetry.go index fe0240640b1..2ba633d3c6a 100644 --- a/internal/collector/telemetry/process_telemetry.go +++ b/internal/collector/telemetry/process_telemetry.go @@ -20,17 +20,30 @@ import ( "runtime" "time" - "github.com/prometheus/procfs" + "github.com/shirou/gopsutil/process" "go.opencensus.io/stats" "go.opencensus.io/stats/view" ) // ProcessMetricsViews is a struct that contains views related to process metrics (cpu, mem, etc) type ProcessMetricsViews struct { + prevTimeUnixNano int64 ballastSizeBytes uint64 views []*view.View done chan struct{} - proc *procfs.Proc + proc *process.Process +} + +var mUptime = stats.Float64( + "process/uptime", + "Uptime of the process", + stats.UnitSeconds) +var viewProcessUptime = &view.View{ + Name: mUptime.Name(), + Description: mUptime.Description(), + Measure: mUptime, + Aggregation: view.Sum(), + TagKeys: nil, } var mRuntimeAllocMem = stats.Int64( @@ -81,23 +94,37 @@ var viewCPUSeconds = &view.View{ TagKeys: nil, } +var mRSSMemory = stats.Int64( + "process/memory/rss", + "Total physical memory (resident set size)", + stats.UnitDimensionless) +var viewRSSMemory = &view.View{ + Name: mRSSMemory.Name(), + Description: mRSSMemory.Description(), + Measure: mRSSMemory, + Aggregation: view.LastValue(), + TagKeys: nil, +} + // NewProcessMetricsViews creates a new set of ProcessMetrics (mem, cpu) that can be used to measure // basic information about this process. -func NewProcessMetricsViews(ballastSizeBytes uint64) *ProcessMetricsViews { +func NewProcessMetricsViews(ballastSizeBytes uint64) (*ProcessMetricsViews, error) { pmv := &ProcessMetricsViews{ + prevTimeUnixNano: time.Now().UnixNano(), ballastSizeBytes: ballastSizeBytes, - views: []*view.View{viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds}, + views: []*view.View{viewProcessUptime, viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds, viewRSSMemory}, done: make(chan struct{}), } - // procfs.Proc is not available on windows and expected to fail. pid := os.Getpid() - proc, err := procfs.NewProc(pid) - if err == nil { - pmv.proc = &proc + + var err error + pmv.proc, err = process.NewProcess(int32(pid)) + if err != nil { + return nil, err } - return pmv + return pmv, nil } // StartCollection starts a ticker'd goroutine that will update the PMV measurements every 5 seconds @@ -127,6 +154,10 @@ func (pmv *ProcessMetricsViews) StopCollection() { } func (pmv *ProcessMetricsViews) updateViews() { + now := time.Now().UnixNano() + stats.Record(context.Background(), mUptime.M(float64(now-pmv.prevTimeUnixNano)/1e9)) + pmv.prevTimeUnixNano = now + ms := &runtime.MemStats{} pmv.readMemStats(ms) stats.Record(context.Background(), mRuntimeAllocMem.M(int64(ms.Alloc))) @@ -134,8 +165,11 @@ func (pmv *ProcessMetricsViews) updateViews() { stats.Record(context.Background(), mRuntimeSysMem.M(int64(ms.Sys))) if pmv.proc != nil { - if procStat, err := pmv.proc.Stat(); err == nil { - stats.Record(context.Background(), mCPUSeconds.M(int64(procStat.CPUTime()))) + if times, err := pmv.proc.Times(); err == nil { + stats.Record(context.Background(), mCPUSeconds.M(int64(times.Total()))) + } + if mem, err := pmv.proc.MemoryInfo(); err == nil { + stats.Record(context.Background(), mRSSMemory.M(int64(mem.RSS))) } } } diff --git a/internal/collector/telemetry/process_telemetry_test.go b/internal/collector/telemetry/process_telemetry_test.go index 97b15b91640..bdb4dafd78a 100644 --- a/internal/collector/telemetry/process_telemetry_test.go +++ b/internal/collector/telemetry/process_telemetry_test.go @@ -15,7 +15,6 @@ package telemetry import ( - "runtime" "testing" "time" @@ -27,17 +26,20 @@ import ( func TestProcessTelemetry(t *testing.T) { const ballastSizeBytes uint64 = 0 - pmv := NewProcessMetricsViews(ballastSizeBytes) + pmv, err := NewProcessMetricsViews(ballastSizeBytes) + require.NoError(t, err) assert.NotNil(t, pmv) expectedViews := []string{ // Changing a metric name is a breaking change. // Adding new metrics is ok as long it follows the conventions described at // https://pkg.go.dev/go.opentelemetry.io/collector/obsreport?tab=doc#hdr-Naming_Convention_for_New_Metrics + "process/uptime", "process/runtime/heap_alloc_bytes", "process/runtime/total_alloc_bytes", "process/runtime/total_sys_memory_bytes", "process/cpu_seconds", + "process/memory/rss", } processViews := pmv.Views() assert.Len(t, processViews, len(expectedViews)) @@ -50,12 +52,6 @@ func TestProcessTelemetry(t *testing.T) { <-time.After(200 * time.Millisecond) for _, viewName := range expectedViews { - if (runtime.GOOS == "windows" || runtime.GOOS == "darwin") && viewName == "process/cpu_seconds" { - // "process/cpu_seconds" is not supported on windows or darwin because there is - // no procfs which is used for reading that metric. - continue - } - rows, err := view.RetrieveData(viewName) require.NoError(t, err, viewName) @@ -63,13 +59,19 @@ func TestProcessTelemetry(t *testing.T) { row := rows[0] assert.Len(t, row.Tags, 0) - lastValue := row.Data.(*view.LastValueData) - if viewName == "process/cpu_seconds" { + var value float64 + if viewName == "process/uptime" { + value = row.Data.(*view.SumData).Value + } else { + value = row.Data.(*view.LastValueData).Value + } + + if viewName == "process/uptime" || viewName == "process/cpu_seconds" { // This likely will still be zero when running the test. - assert.True(t, lastValue.Value >= 0, viewName) + assert.True(t, value >= 0, viewName) continue } - assert.True(t, lastValue.Value > 0, viewName) + assert.True(t, value > 0, viewName) } } diff --git a/service/telemetry.go b/service/telemetry.go index 822fbbf2fca..9304c879505 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -61,6 +61,11 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u return nil } + processMetricsViews, err := telemetry.NewProcessMetricsViews(ballastSizeBytes) + if err != nil { + return err + } + var views []*view.View views = append(views, obsreport.Configure(telemetry.UseLegacyMetrics(), telemetry.UseNewMetrics())...) views = append(views, processor.MetricViews(level)...) @@ -68,7 +73,6 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u views = append(views, batchprocessor.MetricViews(level)...) views = append(views, tailsamplingprocessor.SamplingProcessorMetricViews(level)...) views = append(views, kafkareceiver.MetricViews()...) - processMetricsViews := telemetry.NewProcessMetricsViews(ballastSizeBytes) views = append(views, processMetricsViews.Views()...) views = append(views, fluentobserv.Views(level)...) tel.views = views