Skip to content

Commit

Permalink
Execute using plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Apr 9, 2018
1 parent 388be63 commit e82adec
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 26 deletions.
1 change: 1 addition & 0 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type AgentCommand struct {
Version string
ShutdownCh <-chan struct{}
ProcessorPlugins map[string]ExecutionProcessor
ExecutorPlugins map[string]Executor
HTTPTransport Transport
Store *Store

Expand Down
66 changes: 42 additions & 24 deletions dkron/proc.go → dkron/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,51 @@ const (
func (a *AgentCommand) invokeJob(job *Job, execution *Execution) error {
output, _ := circbuf.NewBuffer(maxBufSize)

cmd := buildCmd(job)
cmd.Stderr = output
cmd.Stdout = output
var success bool

// Start a timer to warn about slow handlers
slowTimer := time.AfterFunc(2*time.Hour, func() {
log.Warnf("proc: Script '%s' slow, execution exceeding %v", job.Command, 2*time.Hour)
})
// Check if job executor is set
if executor, ok := a.ExecutorPlugins[job.Executor]; ok {
out, err := executor.Execute(&ExecuteRequest{
JobName: job.Name,
Config: job.ExecutorConfig,
})
if err != nil {
log.WithError(err).Error("invoke: command error output")
success = false
} else {
success = true
}

err := cmd.Start()
output.Write(out)
} else {
cmd := buildCmd(job)
cmd.Stderr = output
cmd.Stdout = output

// Warn if buffer is overritten
if output.TotalWritten() > output.Size() {
log.Warnf("proc: Script '%s' generated %d bytes of output, truncated to %d", job.Command, output.TotalWritten(), output.Size())
}
// Start a timer to warn about slow handlers
slowTimer := time.AfterFunc(2*time.Hour, func() {
log.Warnf("invoke: Script '%s' slow, execution exceeding %v", job.Command, 2*time.Hour)
})

err := cmd.Start()

// Warn if buffer is overritten
if output.TotalWritten() > output.Size() {
log.Warnf("invoke: Script '%s' generated %d bytes of output, truncated to %d", job.Command, output.TotalWritten(), output.Size())
}

err = cmd.Wait()
slowTimer.Stop()
log.WithFields(logrus.Fields{
"output": output,
}).Debug("invoke: Command output")
if err != nil {
log.WithError(err).Error("invoke: command error output")
success = false
} else {
success = true
}

var success bool
err = cmd.Wait()
slowTimer.Stop()
log.WithFields(logrus.Fields{
"output": output,
}).Debug("proc: Command output")
if err != nil {
log.WithError(err).Error("proc: command error output")
success = false
} else {
success = true
}

execution.FinishedAt = time.Now()
Expand Down Expand Up @@ -91,7 +109,7 @@ func buildCmd(job *Job) (cmd *exec.Cmd) {
} else {
args, err := shellwords.Parse(job.Command)
if err != nil {
log.WithError(err).Fatal("proc: Error parsing command arguments")
log.WithError(err).Fatal("invoke: Error parsing command arguments")
}
cmd = exec.Command(args[0], args[1:]...)
}
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ type Job struct {
Concurrency string `json:"concurrency"`

// Executor plugin to be used in this job
Executor string
Executor string `json:"executor"`

// Executor args
ExecutorConfig ExecutorPluginConfig
ExecutorConfig ExecutorPluginConfig `json:"executor_config"`
}

// Run the job
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func main() {
Ui: ui,
Version: VERSION,
ProcessorPlugins: plugins.Processors,
ExecutorPlugins: plugins.Executors,
}, nil
},
"keygen": func() (cli.Command, error) {
Expand Down

0 comments on commit e82adec

Please sign in to comment.