From 7b99feba1735a1e7a123c9995ae0109132427fb2 Mon Sep 17 00:00:00 2001 From: naman-jain-15 <167972588+naman-jain-15@users.noreply.github.com> Date: Wed, 22 May 2024 10:49:48 +0530 Subject: [PATCH] added process and container metrics (#59) --- receiver/datadogmetricreceiver/translator.go | 205 +++++++++++++++++-- 1 file changed, 189 insertions(+), 16 deletions(-) diff --git a/receiver/datadogmetricreceiver/translator.go b/receiver/datadogmetricreceiver/translator.go index 9c3bf4e557bf..4b4b637aaac7 100644 --- a/receiver/datadogmetricreceiver/translator.go +++ b/receiver/datadogmetricreceiver/translator.go @@ -36,6 +36,62 @@ var datadogMetricTypeStrToEnum map[string]int32 = map[string]int32{ "rate": datadogMetricTypeRate, } +var metrics_to_extract = map[string]map[string]string{ + "system.process.voluntary_context_switches": { + "field": "VoluntaryCtxSwitches", + "type": "", + }, + "system.process.involuntary_context_switches": { + "field": "InvoluntaryCtxSwitches", + "type": "", + }, + "system.process.open_file_descriptors": { + "field": "OpenFdCount", + "type": "", + }, + "system.process.create_time": { + "type": "", + "field": "CreateTime", + }, + "system.process.cpu.total_percentage": { + "type": "cpu", + "field": "TotalPct", + }, + "system.process.cpu.user_percentage": { + "type": "cpu", + "field": "UserPct", + }, + "system.process.cpu.system_percentage": { + "type": "cpu", + "field": "SystemPct", + }, + "system.process.threads_count": { + "type": "cpu", + "field": "NumThreads", + }, + "system.process.rss": { + "type": "memory", + "field": "Rss", + }, + "system.process.vms": { + "type": "memory", + "field": "Vms", + }, +} + +var container_metrics_to_extract = map[string]string{ + "container.process.status": "State", + "container.process.create_time": "CreateTime", + "container.process.cpu.total_percentage": "TotalPct", + "container.process.cpu.user_percentage": "UserPct", + "container.process.cpu.system_percentage": "SystemPct", + "container.process.net_bytes_sent": "NetSentBps", + "container.process.net_bytes_rcvd": "NetRcvdBps", + "container.process.rss": "MemRss", + "container.process.ioread": "Rbps", + "container.process.iowrite": "Wbps", +} + func skipDatadogMetrics(metricName string, metricType int32) bool { if strings.HasPrefix(metricName, "datadog") { return true @@ -378,33 +434,66 @@ func getOtlpExportReqFromDatadogProcessesData(origin string, key string, instrumentationScope.SetName("mw") instrumentationScope.SetVersion("v0.0.1") - metrics_to_extract := []string{"system.process.rss", "system.process.total_cpu_pct"} - for _, processs := range processPayload { if processs == nil { continue } - for _, new_metric := range metrics_to_extract { + for new_metric, metric_map := range metrics_to_extract { var metric_val float64 - if new_metric == "system.process.rss" { + if metric_map["type"] == "memory" { memory_process := processs.GetMemory() if memory_process == nil { continue } - metric_val = float64(memory_process.Rss) - //mwlogger.Info("Memory...", metric_val) + + switch metric_map["field"] { + case "MemRss": + metric_val = float64(memory_process.Rss) + case "Vms": + metric_val = float64(memory_process.Vms) + default: + continue + } } - if new_metric == "system.process.total_cpu_pct" { + if metric_map["type"] == "cpu" { cpustat := processs.GetCpu() if cpustat == nil { continue } - metric_val = float64(cpustat.TotalPct) - //mwlogger.Info("System percentage", metric_val) + switch metric_map["field"] { + case "TotalPct": + metric_val = float64(cpustat.TotalPct) + case "UserPct": + metric_val = float64(cpustat.UserPct) + case "SystemPct": + metric_val = float64(cpustat.SystemPct) + case "NumThreads": + metric_val = float64(cpustat.NumThreads) + default: + continue + } + } + + if metric_map["type"] == "" { + switch metric_map["field"] { + case "VoluntaryCtxSwitches": + metric_val = float64(processs.VoluntaryCtxSwitches) + case "InvoluntaryCtxSwitches": + metric_val = float64(processs.InvoluntaryCtxSwitches) + case "OpenFdCount": + metric_val = float64(processs.OpenFdCount) + case "CreateTime": + currentTime := time.Now() + milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000 + createtime := (int64(milliseconds/1000000) - processs.CreateTime) / 1000 + metric_val = float64(createtime) + default: + continue + } } scopeMetric := scopeMetrics.Metrics().AppendEmpty() @@ -419,25 +508,109 @@ func getOtlpExportReqFromDatadogProcessesData(origin string, key string, val := command.Args result := strings.Join(val, " ") metricAttributes.PutStr("process_name", result) - // _ = process.Command.Ppid - //mwlogger.Info("Cnmd", result) - // mwlogger.Info("Ppid", process.Command.Ppid) } // GET USER INFO userinfo := processs.GetUser() if userinfo != nil { val := userinfo.Name - //mwlogger.Info("Username", val) metricAttributes.PutStr("USERNAME", val) } + // GET PID + pid := processs.Pid + metricAttributes.PutInt("pid", int64(pid)) + + // CREATETIME + currentTime := time.Now() + milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000 + + var dataPoints pmetric.NumberDataPointSlice + gauge := scopeMetric.SetEmptyGauge() + dataPoints = gauge.DataPoints() + + dp := dataPoints.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(milliseconds)) + // Datadog payload stores count value as second member of Point + // array + dp.SetDoubleValue(float64(metric_val) * 1.0) + attributeMap := dp.Attributes() + metricAttributes.CopyTo(attributeMap) + } + + } + + containerPayload := ddReq.GetContainers() + + for _, container := range containerPayload { + + if container == nil { + continue + } + + for new_metric, field := range container_metrics_to_extract { + + var metric_val float64 + + switch field { + case "CreateTime": + // Handle CreateTime metric + currentTime := time.Now() + milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000 + createtime := (int64(milliseconds/1000000000) - container.GetCreated()) + metric_val = float64(createtime) + case "TotalPct": + // Handle TotalPct metric + metric_val = float64(container.GetTotalPct()) + case "UserPct": + // Handle UserPct metric + metric_val = float64(container.GetUserPct()) + case "SystemPct": + // Handle SystemPct metric + metric_val = float64(container.GetSystemPct()) + case "NetSentBps": + // Handle NetSentBps metric + metric_val = float64(container.GetNetSentBps()) + case "NetRcvdBps": + // Handle NetRcvdBps metric + metric_val = float64(container.GetNetRcvdBps()) + case "MemRss": + // Handle MemRss metric + metric_val = float64(container.GetMemRss()) + case "Rbps": + // Handle Rbps metric + metric_val = float64(container.GetRbps()) + case "Wbps": + // Handle Wbps metric + metric_val = float64(container.GetWbps()) + default: + fmt.Printf("Unknown field: %s\n", field) + } + + scopeMetric := scopeMetrics.Metrics().AppendEmpty() + scopeMetric.SetName(new_metric) + //scopeMetric.SetUnit(s.GetUnit()) + + metricAttributes := pcommon.NewMap() + metricAttributes.PutStr("container_name", container.GetName()) + metricAttributes.PutStr("container_image", container.GetImage()) + metricAttributes.PutStr("container_status", string(container.GetState())) + + for _, tag := range container.GetTags() { + // Datadog sends tag as string slice. Each member + // of the slice is of the form ":" + // e.g. "client_version:5.1.1" + parts := strings.Split(tag, ":") + if len(parts) != 2 { + continue + } + + metricAttributes.PutStr(parts[0], parts[1]) + } + // CREATETIME currentTime := time.Now() milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000 - createtime := (int64(milliseconds/1000000) - processs.CreateTime) / 1000 - //mwlogger.Info("Createtime", createtime) - metricAttributes.PutInt("Createtime", createtime) var dataPoints pmetric.NumberDataPointSlice gauge := scopeMetric.SetEmptyGauge()