From 8d96d1537c2783f3d561bc5c1ff03aab227f5485 Mon Sep 17 00:00:00 2001 From: Cobolbaby Date: Sun, 24 Sep 2023 14:39:25 +0800 Subject: [PATCH 1/4] feat(dkron-executor-shell): some improvement --- builtin/bins/dkron-executor-shell/main.go | 1 - .../bins/dkron-executor-shell/prometheus.go | 97 ++++++++++++++++++- builtin/bins/dkron-executor-shell/shell.go | 96 +++--------------- go.mod | 2 +- go.sum | 22 ++++- 5 files changed, 130 insertions(+), 88 deletions(-) diff --git a/builtin/bins/dkron-executor-shell/main.go b/builtin/bins/dkron-executor-shell/main.go index 7b6a5564d..e945e976b 100644 --- a/builtin/bins/dkron-executor-shell/main.go +++ b/builtin/bins/dkron-executor-shell/main.go @@ -11,7 +11,6 @@ import ( func main() { prometheusPort := os.Getenv("SHELL_EXECUTOR_PROMETHEUS_PORT") - if prometheusPort == "" { prometheusPort = "9422" // Default shell executor prometheus metrics port } diff --git a/builtin/bins/dkron-executor-shell/prometheus.go b/builtin/bins/dkron-executor-shell/prometheus.go index b17deb8e0..110ce0ee2 100644 --- a/builtin/bins/dkron-executor-shell/prometheus.go +++ b/builtin/bins/dkron-executor-shell/prometheus.go @@ -1,8 +1,12 @@ package main import ( + "log" + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/shirou/gopsutil/v3/process" ) const ( @@ -19,13 +23,100 @@ var ( memUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, - Name: "mem_usage_kb", + Name: "mem_usage_bytes", Help: "Current memory consumed by job", }, []string{"job_name"}) + + jobExecutionTime = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "execution_time_seconds", + Help: "Job Execution Time", + }, + []string{"job_name"}) + + jobDoneCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "execution_done_count", + Help: "Job Execution Counter", + }, + []string{"job_name"}) ) -func updateMetric(jobName string, metricName *prometheus.GaugeVec, value float64) { - metricName.WithLabelValues(jobName).Set(value) +func CollectProcessMetrics(jobname string, pid int, quit chan struct{}) { + start := time.Now() + + for { + select { + case <-quit: + cpuUsage.WithLabelValues(jobname).Set(0) + memUsage.WithLabelValues(jobname).Set(0) + jobExecutionTime.WithLabelValues(jobname).Set(0) + jobDoneCount.WithLabelValues(jobname).Inc() + return + default: + cpu, mem, err := GetTotalCPUMemUsage(pid) + if err != nil { + log.Printf("Error getting pid statistics: %v", err) + return + } + cpuUsage.WithLabelValues(jobname).Set(cpu) + memUsage.WithLabelValues(jobname).Set(mem) + jobExecutionTime.WithLabelValues(jobname).Set(time.Since(start).Seconds()) + + time.Sleep(3 * time.Second) // Refreshing metrics in real-time each second + } + } } +func GetTotalCPUMemUsage(pid int) (float64, float64, error) { + var totalCPU, totalMem float64 + + parentProc, err := process.NewProcess(int32(pid)) + if err != nil { + log.Printf("NewProcess err: %v", err) + return totalCPU, totalMem, err + } + + allProc := append(GetChildrenProcesses(parentProc), parentProc) + + for _, p := range allProc { + cpu, err := p.Times() + if err != nil { + // log.Printf("p.Times() err: %v", err) + continue + } + mem, err := p.MemoryInfo() + if err != nil { + // log.Printf("p.MemoryInfo() err: %v", err) + continue + } + + // log.Printf("Pid: %d, CPU: %f, Mem: %d", p.Pid, cpu.Total(), mem.RSS) + + totalCPU = totalCPU + cpu.Total() + totalMem = totalMem + float64(mem.RSS) + } + + return totalCPU, totalMem, nil +} + +func GetChildrenProcesses(pp *process.Process) []*process.Process { + var ret []*process.Process + + c, err := pp.Children() + if err != nil || len(c) == 0 { + // log.Printf("pp.Children() err: %v", err) + return ret + } + + for _, cc := range c { + ret = append(ret, cc) + + cp := GetChildrenProcesses(cc) + if len(cp) > 0 { + ret = append(ret, cp...) + } + } + return ret +} diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go index be1868da3..e12078c49 100644 --- a/builtin/bins/dkron-executor-shell/shell.go +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -1,9 +1,6 @@ package main import ( - "bufio" - "bytes" - "encoding/base64" "errors" "fmt" "log" @@ -12,13 +9,13 @@ import ( "runtime" "strconv" "strings" + "syscall" "time" "github.com/armon/circbuf" dkplugin "github.com/distribworks/dkron/v3/plugin" dktypes "github.com/distribworks/dkron/v3/plugin/types" "github.com/mattn/go-shellwords" - "github.com/struCoder/pidusage" ) const ( @@ -67,6 +64,9 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp env := strings.Split(args.Config["env"], ",") cwd := args.Config["cwd"] + executionInfo := strings.Split(fmt.Sprintf("ENV_JOB_NAME=%s", args.JobName), ",") + env = append(env, executionInfo...) + cmd, err := buildCmd(command, shell, env, cwd) if err != nil { return nil, err @@ -79,23 +79,6 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp cmd.Stderr = reportingWriter{buffer: output, cb: cb, isError: true} cmd.Stdout = reportingWriter{buffer: output, cb: cb} - stdin, err := cmd.StdinPipe() - if err != nil { - return nil, err - } - - defer stdin.Close() - - payload, err := base64.StdEncoding.DecodeString(args.Config["payload"]) - if err != nil { - return nil, err - } - - stdin.Write(payload) - stdin.Close() - - log.Printf("shell: going to run %s", command) - jobTimeout := args.Config["timeout"] var jt time.Duration @@ -104,8 +87,11 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp if err != nil { return nil, errors.New("shell: Error parsing job timeout") } + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } + log.Printf("shell: going to run %s", command) + err = cmd.Start() if err != nil { return nil, err @@ -116,7 +102,8 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp if jt != 0 { slowTimer := time.AfterFunc(jt, func() { - err = cmd.Process.Kill() + // Kill child process to avoid cmd.Wait() + err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) // note the minus sign if err != nil { jobTimeoutMessage = fmt.Sprintf("shell: Job '%s' execution time exceeding defined timeout %v. SIGKILL returned error. Job may not have been killed", command, jt) } else { @@ -124,45 +111,14 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp } jobTimedOut = true - return }) defer slowTimer.Stop() } - // Warn if buffer is overwritten - if output.TotalWritten() > output.Size() { - log.Printf("shell: Script '%s' generated %d bytes of output, truncated to %d", command, output.TotalWritten(), output.Size()) - } - - pid := cmd.Process.Pid quit := make(chan struct{}) - go func() { - for { - select { - case <-quit: - return - default: - stat, err := pidusage.GetStat(pid) - if err != nil { - log.Printf("Error getting pid statistics: %v", err) - return - } - - mem, err := calculateMemory(pid) - if err != nil { - log.Printf("Error calculating memory metrics: %v", err) - return - } - - cpu := stat.CPU - updateMetric(args.JobName, memUsage, float64(mem)) - updateMetric(args.JobName, cpuUsage, cpu) - time.Sleep(1 * time.Second) // Refreshing metrics in real-time each second - } - } - }() + go CollectProcessMetrics(args.JobName, cmd.Process.Pid, quit) err = cmd.Wait() close(quit) // exit metric refresh goroutine after job is finished @@ -174,6 +130,11 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp } } + // Warn if buffer is overwritten + if output.TotalWritten() > output.Size() { + log.Printf("shell: Script '%s' generated %d bytes of output, truncated to %d", command, output.TotalWritten(), output.Size()) + } + // Always log output log.Printf("shell: Command output %s", output) @@ -209,30 +170,3 @@ func buildCmd(command string, useShell bool, env []string, cwd string) (cmd *exe cmd.Dir = cwd return } - -func calculateMemory(pid int) (uint64, error) { - f, err := os.Open(fmt.Sprintf("/proc/%d/smaps", pid)) - if err != nil { - return 0, err - } - defer f.Close() - - res := uint64(0) - rfx := []byte("Rss:") - r := bufio.NewScanner(f) - for r.Scan() { - line := r.Bytes() - if bytes.HasPrefix(line, rfx) { - var size uint64 - _, err := fmt.Sscanf(string(line[4:]), "%d", &size) - if err != nil { - return 0, err - } - res += size - } - } - if err := r.Err(); err != nil { - return 0, err - } - return res, nil -} diff --git a/go.mod b/go.mod index 60df7b996..32752ca79 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/prometheus/client_golang v1.16.0 github.com/robfig/cron/v3 v3.0.1 github.com/ryanuber/columnize v2.1.2+incompatible + github.com/shirou/gopsutil/v3 v3.23.8 github.com/sirupsen/logrus v1.9.3 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.7.0 @@ -48,7 +49,6 @@ require ( github.com/spf13/viper v1.16.0 github.com/streadway/amqp v1.1.0 github.com/stretchr/testify v1.8.4 - github.com/struCoder/pidusage v0.2.1 github.com/tencentcloud/tencentcloud-sdk-go v3.0.83+incompatible // indirect github.com/tidwall/buntdb v1.2.7 github.com/tinylib/msgp v1.1.6 // indirect diff --git a/go.sum b/go.sum index 3ca26edfc..de4fd6cbf 100644 --- a/go.sum +++ b/go.sum @@ -849,6 +849,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0= github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc= @@ -1173,6 +1175,8 @@ github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= github.com/linode/linodego v0.7.1 h1:4WZmMpSA2NRwlPZcc0+4Gyn7rr99Evk9bnr0B3gXRKE= github.com/linode/linodego v0.7.1/go.mod h1:ga11n3ivecUrPCHN0rANxKmfWBJVkOXfLMZinAbj2sY= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WVYF7/opLeUgcQs/o= @@ -1286,6 +1290,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -1349,6 +1355,12 @@ github.com/ryanuber/columnize v2.1.2+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/sagikazarmark/crypt v0.10.0/go.mod h1:gwTNHQVoOS3xp9Xvz5LLR+1AauC5M6880z5NWzdhOyQ= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/shirou/gopsutil/v3 v3.23.8 h1:xnATPiybo6GgdRoC4YoGnxXZFRc3dqQTGi73oLvvBrE= +github.com/shirou/gopsutil/v3 v3.23.8/go.mod h1:7hmCaBn+2ZwaZOr6jmPBZDfawwMGuo1id3C6aM8EDqQ= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -1402,8 +1414,6 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/struCoder/pidusage v0.2.1 h1:dFiEgUDkubeIj0XA1NpQ6+8LQmKrLi7NiIQl86E6BoY= -github.com/struCoder/pidusage v0.2.1/go.mod h1:bewtP2KUA1TBUyza5+/PCpSQ6sc/H6jJbIKAzqW86BA= github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/tencentcloud/tencentcloud-sdk-go v1.0.162/go.mod h1:asUz5BPXxgoPGaRgZaVm1iGcUAuHyYUo1nXqKa83cvI= @@ -1431,6 +1441,10 @@ github.com/tidwall/tinyqueue v0.1.1 h1:SpNEvEggbpyN5DIReaJ2/1ndroY8iyEGxPYxoSaym github.com/tidwall/tinyqueue v0.1.1/go.mod h1:O/QNHwrnjqr6IHItYrzoHAKYhBkLI67Q096fQP5zMYw= github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw= github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= @@ -1457,6 +1471,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= +github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.etcd.io/etcd/api/v3 v3.5.7/go.mod h1:9qew1gCdDDLu+VwmeG+iFpL+QlpHTo7iubavdVDgCAA= @@ -1717,6 +1733,7 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1746,6 +1763,7 @@ golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= From eb54a971fb5c14b495e3664444a6a3f50eb10990 Mon Sep 17 00:00:00 2001 From: Cobolbaby Date: Fri, 3 Nov 2023 14:31:55 +0800 Subject: [PATCH 2/4] revert(dkron-executor-shell): payload parameter --- builtin/bins/dkron-executor-shell/shell.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go index e12078c49..23448d1d0 100644 --- a/builtin/bins/dkron-executor-shell/shell.go +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -1,6 +1,7 @@ package main import ( + "encoding/base64" "errors" "fmt" "log" @@ -79,6 +80,21 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp cmd.Stderr = reportingWriter{buffer: output, cb: cb, isError: true} cmd.Stdout = reportingWriter{buffer: output, cb: cb} + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + + defer stdin.Close() + + payload, err := base64.StdEncoding.DecodeString(args.Config["payload"]) + if err != nil { + return nil, err + } + + stdin.Write(payload) + stdin.Close() + jobTimeout := args.Config["timeout"] var jt time.Duration From 0a714214dd250a51b7141442078c336d45c60a65 Mon Sep 17 00:00:00 2001 From: Cobolbaby Date: Fri, 3 Nov 2023 15:03:38 +0800 Subject: [PATCH 3/4] feat(dkron-executor-shell): support dkron_job_exit_code metric --- builtin/bins/dkron-executor-shell/prometheus.go | 13 +++++++++++-- builtin/bins/dkron-executor-shell/shell.go | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/builtin/bins/dkron-executor-shell/prometheus.go b/builtin/bins/dkron-executor-shell/prometheus.go index 110ce0ee2..74d3312aa 100644 --- a/builtin/bins/dkron-executor-shell/prometheus.go +++ b/builtin/bins/dkron-executor-shell/prometheus.go @@ -2,6 +2,7 @@ package main import ( "log" + "os/exec" "time" "github.com/prometheus/client_golang/prometheus" @@ -41,9 +42,16 @@ var ( Help: "Job Execution Counter", }, []string{"job_name"}) + + jobExitCode = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "exit_code", + Help: "Exit code of a job", + }, + []string{"job_name"}) ) -func CollectProcessMetrics(jobname string, pid int, quit chan struct{}) { +func CollectProcessMetrics(jobname string, cmd *exec.Cmd, quit chan struct{}) { start := time.Now() for { @@ -53,9 +61,10 @@ func CollectProcessMetrics(jobname string, pid int, quit chan struct{}) { memUsage.WithLabelValues(jobname).Set(0) jobExecutionTime.WithLabelValues(jobname).Set(0) jobDoneCount.WithLabelValues(jobname).Inc() + jobExitCode.WithLabelValues(jobname).Set(float64(cmd.ProcessState.ExitCode())) return default: - cpu, mem, err := GetTotalCPUMemUsage(pid) + cpu, mem, err := GetTotalCPUMemUsage(cmd.Process.Pid) if err != nil { log.Printf("Error getting pid statistics: %v", err) return diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go index 23448d1d0..9e8cadbab 100644 --- a/builtin/bins/dkron-executor-shell/shell.go +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -134,7 +134,7 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp quit := make(chan struct{}) - go CollectProcessMetrics(args.JobName, cmd.Process.Pid, quit) + go CollectProcessMetrics(args.JobName, cmd, quit) err = cmd.Wait() close(quit) // exit metric refresh goroutine after job is finished From 9e32d4629468a802096554c3dd840bc35bde02ca Mon Sep 17 00:00:00 2001 From: Cobolbaby Date: Wed, 8 Nov 2023 11:57:04 +0800 Subject: [PATCH 4/4] feat(dkron-executor-shell): adjust the input parameters of CollectProcessMetrics to make it more convenient to correct the exit code --- builtin/bins/dkron-executor-shell/prometheus.go | 14 ++++++++------ builtin/bins/dkron-executor-shell/shell.go | 5 +++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/builtin/bins/dkron-executor-shell/prometheus.go b/builtin/bins/dkron-executor-shell/prometheus.go index 74d3312aa..b3ebe3db4 100644 --- a/builtin/bins/dkron-executor-shell/prometheus.go +++ b/builtin/bins/dkron-executor-shell/prometheus.go @@ -2,7 +2,6 @@ package main import ( "log" - "os/exec" "time" "github.com/prometheus/client_golang/prometheus" @@ -51,20 +50,23 @@ var ( []string{"job_name"}) ) -func CollectProcessMetrics(jobname string, cmd *exec.Cmd, quit chan struct{}) { +func CollectProcessMetrics(jobname string, pid int, quit chan int) { start := time.Now() for { select { - case <-quit: + case exitCode, ok := <-quit: + if !ok { + // log.Println("Exit code received and quit channel closed.") + return + } cpuUsage.WithLabelValues(jobname).Set(0) memUsage.WithLabelValues(jobname).Set(0) jobExecutionTime.WithLabelValues(jobname).Set(0) jobDoneCount.WithLabelValues(jobname).Inc() - jobExitCode.WithLabelValues(jobname).Set(float64(cmd.ProcessState.ExitCode())) - return + jobExitCode.WithLabelValues(jobname).Set(float64(exitCode)) default: - cpu, mem, err := GetTotalCPUMemUsage(cmd.Process.Pid) + cpu, mem, err := GetTotalCPUMemUsage(pid) if err != nil { log.Printf("Error getting pid statistics: %v", err) return diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go index 9e8cadbab..09e939f37 100644 --- a/builtin/bins/dkron-executor-shell/shell.go +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -132,11 +132,12 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp defer slowTimer.Stop() } - quit := make(chan struct{}) + quit := make(chan int) - go CollectProcessMetrics(args.JobName, cmd, quit) + go CollectProcessMetrics(args.JobName, cmd.Process.Pid, quit) err = cmd.Wait() + quit <- cmd.ProcessState.ExitCode() close(quit) // exit metric refresh goroutine after job is finished if jobTimedOut {