Skip to content

Commit

Permalink
Prometheus metrics for shell executor (#1330)
Browse files Browse the repository at this point in the history
* real-time metrics from job executions
---------

Co-authored-by: Andrejs Golevs <[email protected]>
Co-authored-by: andreygolev <[email protected]>
  • Loading branch information
3 people authored May 14, 2023
1 parent 6bf1889 commit 29ad1fe
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 0 deletions.
17 changes: 17 additions & 0 deletions builtin/bins/dkron-executor-shell/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
package main

import (
"net/http"
"os"

dkplugin "github.com/distribworks/dkron/v3/plugin"
"github.com/hashicorp/go-plugin"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
prometheusPort := os.Getenv("SHELL_EXECUTOR_PROMETHEUS_PORT")

if prometheusPort == "" {
prometheusPort = "9422" // Default shell executor prometheus metrics port
}

promServer := http.NewServeMux()
promServer.Handle("/metrics", promhttp.Handler())

go func() {
http.ListenAndServe(":"+prometheusPort, promServer)
}()

plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: dkplugin.Handshake,
Plugins: map[string]plugin.Plugin{
Expand Down
31 changes: 31 additions & 0 deletions builtin/bins/dkron-executor-shell/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
namespace = "dkron_job"
)

var (
cpuUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "cpu_usage",
Help: "CPU usage by job",
},
[]string{"job_name"})

memUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "mem_usage_kb",
Help: "Current memory consumed by job",
},
[]string{"job_name"})
)

func updateMetric(jobName string, metricName *prometheus.GaugeVec, value float64) {
metricName.WithLabelValues(jobName).Set(value)
}

60 changes: 60 additions & 0 deletions builtin/bins/dkron-executor-shell/shell.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"bufio"
"bytes"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -16,6 +18,7 @@ import (
dkplugin "github.com/distribworks/dkron/v3/plugin"
dktypes "github.com/distribworks/dkron/v3/plugin/types"
"github.com/mattn/go-shellwords"
"github.com/struCoder/pidusage"
)

const (
Expand Down Expand Up @@ -132,7 +135,37 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp
log.Printf("shell: Script '%s' generated %d bytes of output, truncated to %d", command, output.TotalWritten(), output.Size())
}

pid := cmd.Process.Pid
quit := make(chan struct{})

go func() {
for {
select {
case <-quit:
return
default:
stat, err := pidusage.GetStat(pid)
if err != nil {
log.Printf("Error getting pid statistics: %v", err)
return
}

mem, err := calculateMemory(pid)
if err != nil {
log.Printf("Error calculating memory metrics: %v", err)
return
}

cpu := stat.CPU
updateMetric(args.JobName, memUsage, float64(mem))
updateMetric(args.JobName, cpuUsage, cpu)
time.Sleep(1 * time.Second) // Refreshing metrics in real-time each second
}
}
}()

err = cmd.Wait()
close(quit) // exit metric refresh goroutine after job is finished

if jobTimedOut {
_, err := output.Write([]byte(jobTimeoutMessage))
Expand Down Expand Up @@ -176,3 +209,30 @@ func buildCmd(command string, useShell bool, env []string, cwd string) (cmd *exe
cmd.Dir = cwd
return
}

func calculateMemory(pid int) (uint64, error) {
f, err := os.Open(fmt.Sprintf("/proc/%d/smaps", pid))
if err != nil {
return 0, err
}
defer f.Close()

res := uint64(0)
rfx := []byte("Rss:")
r := bufio.NewScanner(f)
for r.Scan() {
line := r.Bytes()
if bytes.HasPrefix(line, rfx) {
var size uint64
_, err := fmt.Sscanf(string(line[4:]), "%d", &size)
if err != nil {
return 0, err
}
res += size
}
}
if err := r.Err(); err != nil {
return 0, err
}
return res, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/spf13/viper v1.15.0
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.8.2
github.com/struCoder/pidusage v0.2.1 // indirect
github.com/tencentcloud/tencentcloud-sdk-go v3.0.83+incompatible // indirect
github.com/tidwall/buntdb v1.2.7
github.com/tinylib/msgp v1.1.6 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/struCoder/pidusage v0.2.1 h1:dFiEgUDkubeIj0XA1NpQ6+8LQmKrLi7NiIQl86E6BoY=
github.com/struCoder/pidusage v0.2.1/go.mod h1:bewtP2KUA1TBUyza5+/PCpSQ6sc/H6jJbIKAzqW86BA=
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/tencentcloud/tencentcloud-sdk-go v1.0.162/go.mod h1:asUz5BPXxgoPGaRgZaVm1iGcUAuHyYUo1nXqKa83cvI=
Expand Down
12 changes: 12 additions & 0 deletions website/docs/usage/executors/shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,15 @@ Example
}
}
```

## Job execution prometheus metrics
Path: `/metrics`
Port: 9422
or configure via environment variable `SHELL_EXECUTOR_PROMETHEUS_PORT`

### Exposed metrics

| Name | Type | Description | Labels |
|------------------------|:------|-------------------------------:|---------:|
| dkron_job_cpu_usage | gauge | current CPU usage by job | job_name |
| dkron_job_mem_usage_kb | gauge | current memory consumed by job | job_name |

0 comments on commit 29ad1fe

Please sign in to comment.