From 62a02d60f1907aa5433a74ecd4648c597ecc55d8 Mon Sep 17 00:00:00 2001 From: Andrejs Golevs Date: Tue, 23 Feb 2021 17:00:04 +0200 Subject: [PATCH 1/4] configurable job timeout --- builtin/bins/dkron-executor-shell/shell.go | 34 ++++++++++++++++++---- website/content/usage/executors/shell.md | 4 ++- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go index 84d4f47b8..3273b6004 100644 --- a/builtin/bins/dkron-executor-shell/shell.go +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -2,6 +2,8 @@ package main import ( "encoding/base64" + "errors" + "fmt" "log" "os" "os/exec" @@ -74,12 +76,6 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp cmd.Stderr = reportingWriter{buffer: output, cb: cb, isError: true} cmd.Stdout = reportingWriter{buffer: output, cb: cb} - // Start a timer to warn about slow handlers - slowTimer := time.AfterFunc(2*time.Hour, func() { - log.Printf("shell: Script '%s' slow, execution exceeding %v", command, 2*time.Hour) - }) - defer slowTimer.Stop() - stdin, err := cmd.StdinPipe() if err != nil { return nil, err @@ -96,11 +92,37 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp stdin.Close() log.Printf("shell: going to run %s", command) + + jobTimeout := args.Config["timeout"] + err = cmd.Start() if err != nil { return nil, err } + if jobTimeout != "" { + t, err := time.ParseDuration(jobTimeout) + if err != nil { + return nil, errors.New("shell: Error parsing job timeout") + } + + slowTimer := time.AfterFunc(t, func() { + j := fmt.Sprintf("shell: Job '%s' execution time exceeding defined timeout %v. Killing job.", command, t) + log.Print(j) + _, err := output.Write([]byte(j)) + if err != nil { + log.Printf("Error writing output on timeout event: %v", err) + return + } + err = cmd.Process.Kill() + if err != nil { + log.Printf("Error killing process on timeout event: %v", err) + return + } + }) + defer slowTimer.Stop() + } + // Warn if buffer is overritten if output.TotalWritten() > output.Size() { log.Printf("shell: Script '%s' generated %d bytes of output, truncated to %d", command, output.TotalWritten(), output.Size()) diff --git a/website/content/usage/executors/shell.md b/website/content/usage/executors/shell.md index 609fb439f..43c977ad6 100644 --- a/website/content/usage/executors/shell.md +++ b/website/content/usage/executors/shell.md @@ -14,6 +14,7 @@ shell: Run this command using a shell environment command: The command to run env: Env vars separated by comma cwd: Chdir before command run +timeout: Force kill job after specified time. Format: https://golang.org/pkg/time/#ParseDuration. ``` Example @@ -25,7 +26,8 @@ Example "shell": "true", "command": "my_command", "env": "ENV_VAR=va1,ANOTHER_ENV_VAR=var2", - "cwd": "/app" + "cwd": "/app", + "timeout": "24h" } } ``` From b74919b80979ba4dac8dfbf3f92c5f9df502ab7e Mon Sep 17 00:00:00 2001 From: Andrejs Golevs Date: Wed, 24 Feb 2021 11:49:59 +0200 Subject: [PATCH 2/4] validate job timeout if it's specified --- dkron/job.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dkron/job.go b/dkron/job.go index dcb94fbb5..372eb7d85 100644 --- a/dkron/job.go +++ b/dkron/job.go @@ -357,6 +357,13 @@ func (j *Job) Validate() error { return err } + if j.Executor == "shell" && j.ExecutorConfig["timeout"] != "" { + _, err := time.ParseDuration(j.ExecutorConfig["timeout"]) + if err != nil { + return fmt.Errorf("Error parsing job timeout value") + } + } + return nil } From e82d4ca2d502dfe1566a94ec8b3c7da3effb7b10 Mon Sep 17 00:00:00 2001 From: Andrejs Golevs Date: Wed, 24 Feb 2021 11:50:17 +0200 Subject: [PATCH 3/4] rework of job timeout after review --- builtin/bins/dkron-executor-shell/shell.go | 49 +++++++++++++--------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go index 3273b6004..6c83751ba 100644 --- a/builtin/bins/dkron-executor-shell/shell.go +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -94,42 +94,51 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp log.Printf("shell: going to run %s", command) jobTimeout := args.Config["timeout"] + var jt time.Duration + + if jobTimeout != "" { + jt, err = time.ParseDuration(jobTimeout) + if err != nil { + return nil, errors.New("shell: Error parsing job timeout") + } + } err = cmd.Start() if err != nil { return nil, err } - if jobTimeout != "" { - t, err := time.ParseDuration(jobTimeout) + var jobTimeoutMessage string + var jobWasKilled bool + + slowTimer := time.AfterFunc(jt, func() { + err = cmd.Process.Kill() if err != nil { - return nil, errors.New("shell: Error parsing job timeout") + jobTimeoutMessage = fmt.Sprintf("shell: Job '%s' execution time exceeding defined timeout %v. SIGKILL returned error. Job probably was not killed", command, jt) + } else { + jobTimeoutMessage = fmt.Sprintf("shell: Job '%s' execution time exceeding defined timeout %v. Job was killed", command, jt) } - slowTimer := time.AfterFunc(t, func() { - j := fmt.Sprintf("shell: Job '%s' execution time exceeding defined timeout %v. Killing job.", command, t) - log.Print(j) - _, err := output.Write([]byte(j)) - if err != nil { - log.Printf("Error writing output on timeout event: %v", err) - return - } - err = cmd.Process.Kill() - if err != nil { - log.Printf("Error killing process on timeout event: %v", err) - return - } - }) - defer slowTimer.Stop() - } + jobWasKilled = true + return + }) + + defer slowTimer.Stop() - // Warn if buffer is overritten + // Warn if buffer is ovewritten if output.TotalWritten() > output.Size() { log.Printf("shell: Script '%s' generated %d bytes of output, truncated to %d", command, output.TotalWritten(), output.Size()) } err = cmd.Wait() + if jobWasKilled { + _, err := output.Write([]byte(jobTimeoutMessage)) + if err != nil { + log.Printf("Error writing output on timeout event: %v", err) + } + } + // Always log output log.Printf("shell: Command output %s", output) From 8bc85fbff464e1eadcdcc555e22f8e6d768919d0 Mon Sep 17 00:00:00 2001 From: Andrejs Golevs Date: Thu, 25 Feb 2021 11:38:27 +0200 Subject: [PATCH 4/4] test adeed; typo, renaming, messages fixes --- builtin/bins/dkron-executor-shell/shell.go | 10 +++++----- dkron/api_test.go | 12 ++++++++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go index 6c83751ba..d2fe44504 100644 --- a/builtin/bins/dkron-executor-shell/shell.go +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -109,30 +109,30 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp } var jobTimeoutMessage string - var jobWasKilled bool + var jobTimedOut bool slowTimer := time.AfterFunc(jt, func() { err = cmd.Process.Kill() if err != nil { - jobTimeoutMessage = fmt.Sprintf("shell: Job '%s' execution time exceeding defined timeout %v. SIGKILL returned error. Job probably was not killed", command, jt) + jobTimeoutMessage = fmt.Sprintf("shell: Job '%s' execution time exceeding defined timeout %v. SIGKILL returned error. Job may not have been killed", command, jt) } else { jobTimeoutMessage = fmt.Sprintf("shell: Job '%s' execution time exceeding defined timeout %v. Job was killed", command, jt) } - jobWasKilled = true + jobTimedOut = true return }) defer slowTimer.Stop() - // Warn if buffer is ovewritten + // Warn if buffer is overwritten if output.TotalWritten() > output.Size() { log.Printf("shell: Script '%s' generated %d bytes of output, truncated to %d", command, output.TotalWritten(), output.Size()) } err = cmd.Wait() - if jobWasKilled { + if jobTimedOut { _, err := output.Write([]byte(jobTimeoutMessage)) if err != nil { log.Printf("Error writing output on timeout event: %v", err) diff --git a/dkron/api_test.go b/dkron/api_test.go index d9aabb06a..99d8c7d82 100644 --- a/dkron/api_test.go +++ b/dkron/api_test.go @@ -236,6 +236,18 @@ func TestAPIJobCreateUpdateValidationBadTimezone(t *testing.T) { assert.Equal(t, http.StatusBadRequest, resp.StatusCode) } +func TestAPIJobCreateUpdateValidationBadShellExecutorTimeout(t *testing.T) { + resp := postJob(t, "8099", []byte(`{ + "name": "testjob", + "schedule": "@every 1m", + "executor": "shell", + "executor_config": {"command": "date", "timeout": "foreverandever"}, + "disabled": true + }`)) + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + func TestAPIGetNonExistentJobReturnsNotFound(t *testing.T) { port := "8096" baseURL := fmt.Sprintf("http://localhost:%s/v1", port)