From ef2bb5bf78c2ab78a0a3f7bf2b48eff259b24a1b Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Thu, 6 Jul 2017 22:11:16 +0200 Subject: [PATCH 01/21] Basic main --- glide.lock | 8 ++ glide.yaml | 6 ++ main.go | 233 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 247 insertions(+) create mode 100644 glide.lock create mode 100644 glide.yaml create mode 100644 main.go diff --git a/glide.lock b/glide.lock new file mode 100644 index 0000000..56c68c2 --- /dev/null +++ b/glide.lock @@ -0,0 +1,8 @@ +hash: ed8b81bb597a9419eaf363c402ac8db873b75929e07285d718e35f7ccbe9439d +updated: 2017-07-06T19:02:35.835644965+02:00 +imports: +- name: github.com/gorhill/cronexpr + version: a557574d6c024ed6e36acc8b610f5f211c91568a +- name: github.com/sirupsen/logrus + version: 202f25545ea4cf9b191ff7f846df5d87c9382c2b +testImports: [] diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..5dac775 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,6 @@ +package: github.com/aptible/concron +import: +- package: github.com/gorhill/cronexpr + version: ~1.0.0 +- package: github.com/sirupsen/logrus + version: ~1.0.0 diff --git a/main.go b/main.go new file mode 100644 index 0000000..f86434d --- /dev/null +++ b/main.go @@ -0,0 +1,233 @@ +package main + +import ( + "fmt" + "time" + "bufio" + "io" + "os" + "os/exec" + "regexp" + "sync" + "strings" + "github.com/gorhill/cronexpr" + "github.com/sirupsen/logrus" +) + +var ( + delimiter = regexp.MustCompile(`\S+`) + parameterCounts = []int{ + 7, // POSIX + seconds + years + 6, // POSIX + years + 5, // POSIX + 1, // shorthand (e.g. @hourly) + } +) + +type CrontabLine struct { + expression *cronexpr.Expression + schedule string + command string +} + +type Job struct { + CrontabLine + position int +} + +func parseCrontabLine(line string) (*CrontabLine, error) { + indices := delimiter.FindAllStringIndex(line, -1) + + for _, count := range parameterCounts { + if len(indices) <= count { + continue + } + + scheduleEnds := indices[count - 1][1] + commandStarts := indices[count][0] + + logrus.Debugf("try parse(%d): %s[0:%d] = %s", count, line, scheduleEnds, line[0:scheduleEnds]) + + expr, err := cronexpr.Parse(line[:scheduleEnds]) + + if (err != nil) { + continue + } + + return &CrontabLine{ + expression: expr, + schedule: line[:scheduleEnds], + command: line[commandStarts:], + }, nil + } + return nil, fmt.Errorf("bad crontab line: %s", line) +} + +func parseCrontab(scanner *bufio.Scanner) ([]*Job, error) { + // TODO: Understand environment variables, too. + position := 0 + ret := make([]*Job, 0) + + for scanner.Scan() { + line := scanner.Text(); + + // TODO: Allow environment variables? We may need special handling for: + // - SHELL + // - USER? + parsedLine, err := parseCrontabLine(line) + if (err != nil) { + return nil, err + } + + ret = append(ret, &Job{CrontabLine: *parsedLine, position: position}) + } + + + if err := scanner.Err(); err != nil { + return nil, err + } + + return ret, nil +} + +func drainReader(wg sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader) { + wg.Add(1) + + go func() { + defer wg.Done() + + scanner := bufio.NewScanner(reader) + + for scanner.Scan() { + readerLogger.Info(scanner.Text()) + } + + + if err := scanner.Err(); err != nil { + // The underlying reader might get closed by e.g. Wait(), or + // even the process we're starting, so we don't log EOF-like + // errors + if (strings.Contains(err.Error(), os.ErrClosed.Error())) { + return + } + + readerLogger.Error(err) + } + }() +} + +func runJob(command string, jobLogger *logrus.Entry) error { + jobLogger.Info("starting") + + cmd := exec.Command("/bin/sh", "-c", command) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + + stderr, err := cmd.StderrPipe() + if err != nil { + return err + } + + if err := cmd.Start(); err != nil { + return err + } + + var wg sync.WaitGroup + + stdoutLogger := jobLogger.WithFields(logrus.Fields{ "channel": "stdout", }) + go drainReader(wg, stdoutLogger, stdout) + + stderrLogger := jobLogger.WithFields(logrus.Fields{ "channel": "stderr", }) + go drainReader(wg, stderrLogger, stderr) + + wg.Wait() + + if err := cmd.Wait(); err != nil { + return err + } + + jobLogger.Info("job succeeded") + + return nil +} + +func runCron(job *Job, exitChan chan interface{}) { + // NOTE: this (intentionally) does not run multiple instances of the + // job concurrently + cronLogger := logrus.WithFields(logrus.Fields{ + "job.schedule": job.schedule, + "job.command": job.command, + "job.position": job.position, + }) + + var cronIteration uint64 = 0 + nextRun := job.expression.Next(time.Now()) + + for { + nextRun = job.expression.Next(nextRun) + cronLogger.Debugf("job will run next at %v", nextRun) + + delay := nextRun.Sub(time.Now()) + if (delay < 0) { + cronLogger.Warningf("job took too long to run: it should have started %v ago", -delay) + nextRun = time.Now() + continue + } + + time.Sleep(delay) + + jobLogger := cronLogger.WithFields(logrus.Fields{ + "iteration": cronIteration, + }) + + if err := runJob(job.command, jobLogger); err != nil { + cronLogger.Error(err) + } + + cronIteration++ + } +} + +func main() { + // TODO: debug flag instead + // TODO: JSON logging? + logrus.SetLevel(logrus.DebugLevel) + logrus.SetFormatter(&logrus.TextFormatter{ FullTimestamp: true, }) + + if (len(os.Args) != 2) { + fmt.Fprintf(os.Stderr, "Usage: %s CRONTAB\n", os.Args[0]) + os.Exit(2) + return + } + + crontab := os.Args[1] + logrus.Infof("read crontab: %s", crontab) + + file, err := os.Open(crontab) + if err != nil { + logrus.Fatal(err) + return + } + defer file.Close() + + entries, err := parseCrontab(bufio.NewScanner(file)) + + if (err != nil) { + logrus.Fatal(err) + return + } + + // TODO: Signal handling. + // TODO: Should actually have a sync group here, and send the exit + // request in. + requestExitChan := make(chan interface{}) + + for _, job := range entries { + go runCron(job, requestExitChan) + } + + <-requestExitChan +} From b3fe9cd10dff3328901c007cf69ebe2700e69368 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 18:04:23 +0200 Subject: [PATCH 02/21] parse environment --- main.go | 95 ++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 73 insertions(+), 22 deletions(-) diff --git a/main.go b/main.go index f86434d..f196787 100644 --- a/main.go +++ b/main.go @@ -15,13 +15,16 @@ import ( ) var ( - delimiter = regexp.MustCompile(`\S+`) + jobLineSeparator = regexp.MustCompile(`\S+`) + envLineMatcher = regexp.MustCompile(`^([^\s=]+)\s*=\s*(.*)$`) + parameterCounts = []int{ 7, // POSIX + seconds + years 6, // POSIX + years 5, // POSIX 1, // shorthand (e.g. @hourly) } + ) type CrontabLine struct { @@ -35,8 +38,18 @@ type Job struct { position int } -func parseCrontabLine(line string) (*CrontabLine, error) { - indices := delimiter.FindAllStringIndex(line, -1) +type Context struct { + shell string + environ map[string]string +} + +type Crontab struct { + jobs []*Job + context *Context +} + +func parseJobLine(line string) (*CrontabLine, error) { + indices := jobLineSeparator.FindAllStringIndex(line, -1) for _, count := range parameterCounts { if len(indices) <= count { @@ -63,23 +76,49 @@ func parseCrontabLine(line string) (*CrontabLine, error) { return nil, fmt.Errorf("bad crontab line: %s", line) } -func parseCrontab(scanner *bufio.Scanner) ([]*Job, error) { +func parseCrontab(scanner *bufio.Scanner) (*Crontab, error) { + // TODO: Don't return an array of Job, return an object representing the crontab // TODO: Understand environment variables, too. + // TODO: Increment position position := 0 - ret := make([]*Job, 0) + + jobs := make([]*Job, 0) + + // TODO: CRON_TZ + environ := make(map[string]string) + shell := "/bin/sh" for scanner.Scan() { - line := scanner.Text(); + line := strings.TrimLeft(scanner.Text(), " \t"); + + if line == "" { + continue + } + + if line[0] == '#' { + continue + } + + r := envLineMatcher.FindAllStringSubmatch(line, -1) + if len(r) == 1 && len(r[0]) == 3 { + // TODO: Should error on setting USER? + envKey := r[0][1] + envVal := r[0][2] + if envKey == "SHELL" { + shell = envVal + } else { + environ[envKey] = envVal + } + continue + } - // TODO: Allow environment variables? We may need special handling for: - // - SHELL - // - USER? - parsedLine, err := parseCrontabLine(line) + jobLine, err := parseJobLine(line) if (err != nil) { return nil, err } - ret = append(ret, &Job{CrontabLine: *parsedLine, position: position}) + jobs = append(jobs, &Job{CrontabLine: *jobLine, position: position,}) + position++ } @@ -87,7 +126,13 @@ func parseCrontab(scanner *bufio.Scanner) ([]*Job, error) { return nil, err } - return ret, nil + return &Crontab{ + jobs: jobs, + context: &Context{ + shell: shell, + environ: environ, + }, + }, nil } func drainReader(wg sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader) { @@ -116,10 +161,16 @@ func drainReader(wg sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader }() } -func runJob(command string, jobLogger *logrus.Entry) error { +func runJob(context *Context, command string, jobLogger *logrus.Entry) error { jobLogger.Info("starting") - cmd := exec.Command("/bin/sh", "-c", command) + cmd := exec.Command(context.shell, "-c", command) + + env := os.Environ() + for k, v := range context.environ { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + cmd.Env = env stdout, err := cmd.StdoutPipe() if err != nil { @@ -154,7 +205,7 @@ func runJob(command string, jobLogger *logrus.Entry) error { return nil } -func runCron(job *Job, exitChan chan interface{}) { +func runCron(context *Context, job *Job, exitChan chan interface{}) { // NOTE: this (intentionally) does not run multiple instances of the // job concurrently cronLogger := logrus.WithFields(logrus.Fields{ @@ -183,7 +234,7 @@ func runCron(job *Job, exitChan chan interface{}) { "iteration": cronIteration, }) - if err := runJob(job.command, jobLogger); err != nil { + if err := runJob(context, job.command, jobLogger); err != nil { cronLogger.Error(err) } @@ -203,17 +254,17 @@ func main() { return } - crontab := os.Args[1] - logrus.Infof("read crontab: %s", crontab) + crontabFileName := os.Args[1] + logrus.Infof("read crontab: %s", crontabFileName) - file, err := os.Open(crontab) + file, err := os.Open(crontabFileName) if err != nil { logrus.Fatal(err) return } defer file.Close() - entries, err := parseCrontab(bufio.NewScanner(file)) + crontab, err := parseCrontab(bufio.NewScanner(file)) if (err != nil) { logrus.Fatal(err) @@ -225,8 +276,8 @@ func main() { // request in. requestExitChan := make(chan interface{}) - for _, job := range entries { - go runCron(job, requestExitChan) + for _, job := range crontab.jobs { + go runCron(crontab.context, job, requestExitChan) } <-requestExitChan From 7ca7e2a79672274041e9c80cfd9e6efb1d592b54 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 18:31:42 +0200 Subject: [PATCH 03/21] Refactor into a few packages --- cron/cron.go | 122 +++++++++++++++++++++++ crontab/crontab.go | 112 +++++++++++++++++++++ crontab/types.go | 26 +++++ main.go | 241 +-------------------------------------------- 4 files changed, 265 insertions(+), 236 deletions(-) create mode 100644 cron/cron.go create mode 100644 crontab/crontab.go create mode 100644 crontab/types.go diff --git a/cron/cron.go b/cron/cron.go new file mode 100644 index 0000000..a665de3 --- /dev/null +++ b/cron/cron.go @@ -0,0 +1,122 @@ +package cron + +import ( + "fmt" + "time" + "bufio" + "io" + "os" + "os/exec" + "sync" + "strings" + "github.com/aptible/concron/crontab" + "github.com/sirupsen/logrus" +) + + +func drainReader(wg sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader) { + wg.Add(1) + + go func() { + defer wg.Done() + + scanner := bufio.NewScanner(reader) + + for scanner.Scan() { + readerLogger.Info(scanner.Text()) + } + + + if err := scanner.Err(); err != nil { + // The underlying reader might get closed by e.g. Wait(), or + // even the process we're starting, so we don't log EOF-like + // errors + if (strings.Contains(err.Error(), os.ErrClosed.Error())) { + return + } + + readerLogger.Error(err) + } + }() +} + +func runJob(context *crontab.Context, command string, jobLogger *logrus.Entry) error { + jobLogger.Info("starting") + + cmd := exec.Command(context.Shell, "-c", command) + + env := os.Environ() + for k, v := range context.Environ { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + cmd.Env = env + + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + + stderr, err := cmd.StderrPipe() + if err != nil { + return err + } + + if err := cmd.Start(); err != nil { + return err + } + + var wg sync.WaitGroup + + stdoutLogger := jobLogger.WithFields(logrus.Fields{ "channel": "stdout", }) + go drainReader(wg, stdoutLogger, stdout) + + stderrLogger := jobLogger.WithFields(logrus.Fields{ "channel": "stderr", }) + go drainReader(wg, stderrLogger, stderr) + + wg.Wait() + + if err := cmd.Wait(); err != nil { + return err + } + + jobLogger.Info("job succeeded") + + return nil +} + +func StartJob(context *crontab.Context, job *crontab.Job, exitChan chan interface{}) { + // NOTE: this (intentionally) does not run multiple instances of the + // job concurrently + cronLogger := logrus.WithFields(logrus.Fields{ + "job.schedule": job.Schedule, + "job.command": job.Command, + "job.position": job.Position, + }) + + var cronIteration uint64 = 0 + nextRun := job.Expression.Next(time.Now()) + + for { + nextRun = job.Expression.Next(nextRun) + cronLogger.Debugf("job will run next at %v", nextRun) + + delay := nextRun.Sub(time.Now()) + if (delay < 0) { + cronLogger.Warningf("job took too long to run: it should have started %v ago", -delay) + nextRun = time.Now() + continue + } + + time.Sleep(delay) + + jobLogger := cronLogger.WithFields(logrus.Fields{ + "iteration": cronIteration, + }) + + if err := runJob(context, job.Command, jobLogger); err != nil { + cronLogger.Error(err) + } + + cronIteration++ + } +} diff --git a/crontab/crontab.go b/crontab/crontab.go new file mode 100644 index 0000000..6d9a0ca --- /dev/null +++ b/crontab/crontab.go @@ -0,0 +1,112 @@ +package crontab + +import ( + "fmt" + "bufio" + "strings" + "regexp" + "github.com/gorhill/cronexpr" + "github.com/sirupsen/logrus" +) + +var ( + jobLineSeparator = regexp.MustCompile(`\S+`) + envLineMatcher = regexp.MustCompile(`^([^\s=]+)\s*=\s*(.*)$`) + + parameterCounts = []int{ + 7, // POSIX + seconds + years + 6, // POSIX + years + 5, // POSIX + 1, // shorthand (e.g. @hourly) + } + +) + +func parseJobLine(line string) (*crontabLine, error) { + indices := jobLineSeparator.FindAllStringIndex(line, -1) + + for _, count := range parameterCounts { + if len(indices) <= count { + continue + } + + scheduleEnds := indices[count - 1][1] + commandStarts := indices[count][0] + + // TODO: Should receive a logger? + logrus.Debugf("try parse(%d): %s[0:%d] = %s", count, line, scheduleEnds, line[0:scheduleEnds]) + + expr, err := cronexpr.Parse(line[:scheduleEnds]) + + if (err != nil) { + continue + } + + return &crontabLine{ + Expression: expr, + Schedule: line[:scheduleEnds], + Command: line[commandStarts:], + }, nil + } + return nil, fmt.Errorf("bad crontab line: %s", line) +} + +func ParseCrontab(scanner *bufio.Scanner) (*Crontab, error) { + // TODO: Don't return an array of Job, return an object representing the crontab + // TODO: Understand environment variables, too. + // TODO: Increment position + position := 0 + + jobs := make([]*Job, 0) + + // TODO: CRON_TZ + environ := make(map[string]string) + shell := "/bin/sh" + + for scanner.Scan() { + line := strings.TrimLeft(scanner.Text(), " \t"); + + if line == "" { + continue + } + + if line[0] == '#' { + continue + } + + r := envLineMatcher.FindAllStringSubmatch(line, -1) + if len(r) == 1 && len(r[0]) == 3 { + // TODO: Should error on setting USER? + envKey := r[0][1] + envVal := r[0][2] + if envKey == "SHELL" { + shell = envVal + } else { + environ[envKey] = envVal + } + continue + } + + jobLine, err := parseJobLine(line) + if (err != nil) { + return nil, err + } + + jobs = append(jobs, &Job{crontabLine: *jobLine, Position: position,}) + position++ + } + + + if err := scanner.Err(); err != nil { + return nil, err + } + + return &Crontab{ + Jobs: jobs, + Context: &Context{ + Shell: shell, + Environ: environ, + }, + }, nil +} + diff --git a/crontab/types.go b/crontab/types.go new file mode 100644 index 0000000..e88c821 --- /dev/null +++ b/crontab/types.go @@ -0,0 +1,26 @@ +package crontab + +import ( + "github.com/gorhill/cronexpr" +) + +type crontabLine struct { + Expression *cronexpr.Expression + Schedule string + Command string +} + +type Job struct { + crontabLine + Position int +} + +type Context struct { + Shell string + Environ map[string]string +} + +type Crontab struct { + Jobs []*Job + Context *Context +} diff --git a/main.go b/main.go index f196787..3735e47 100644 --- a/main.go +++ b/main.go @@ -2,245 +2,14 @@ package main import ( "fmt" - "time" "bufio" - "io" "os" - "os/exec" - "regexp" - "sync" - "strings" - "github.com/gorhill/cronexpr" + "github.com/aptible/concron/cron" + "github.com/aptible/concron/crontab" "github.com/sirupsen/logrus" ) -var ( - jobLineSeparator = regexp.MustCompile(`\S+`) - envLineMatcher = regexp.MustCompile(`^([^\s=]+)\s*=\s*(.*)$`) - parameterCounts = []int{ - 7, // POSIX + seconds + years - 6, // POSIX + years - 5, // POSIX - 1, // shorthand (e.g. @hourly) - } - -) - -type CrontabLine struct { - expression *cronexpr.Expression - schedule string - command string -} - -type Job struct { - CrontabLine - position int -} - -type Context struct { - shell string - environ map[string]string -} - -type Crontab struct { - jobs []*Job - context *Context -} - -func parseJobLine(line string) (*CrontabLine, error) { - indices := jobLineSeparator.FindAllStringIndex(line, -1) - - for _, count := range parameterCounts { - if len(indices) <= count { - continue - } - - scheduleEnds := indices[count - 1][1] - commandStarts := indices[count][0] - - logrus.Debugf("try parse(%d): %s[0:%d] = %s", count, line, scheduleEnds, line[0:scheduleEnds]) - - expr, err := cronexpr.Parse(line[:scheduleEnds]) - - if (err != nil) { - continue - } - - return &CrontabLine{ - expression: expr, - schedule: line[:scheduleEnds], - command: line[commandStarts:], - }, nil - } - return nil, fmt.Errorf("bad crontab line: %s", line) -} - -func parseCrontab(scanner *bufio.Scanner) (*Crontab, error) { - // TODO: Don't return an array of Job, return an object representing the crontab - // TODO: Understand environment variables, too. - // TODO: Increment position - position := 0 - - jobs := make([]*Job, 0) - - // TODO: CRON_TZ - environ := make(map[string]string) - shell := "/bin/sh" - - for scanner.Scan() { - line := strings.TrimLeft(scanner.Text(), " \t"); - - if line == "" { - continue - } - - if line[0] == '#' { - continue - } - - r := envLineMatcher.FindAllStringSubmatch(line, -1) - if len(r) == 1 && len(r[0]) == 3 { - // TODO: Should error on setting USER? - envKey := r[0][1] - envVal := r[0][2] - if envKey == "SHELL" { - shell = envVal - } else { - environ[envKey] = envVal - } - continue - } - - jobLine, err := parseJobLine(line) - if (err != nil) { - return nil, err - } - - jobs = append(jobs, &Job{CrontabLine: *jobLine, position: position,}) - position++ - } - - - if err := scanner.Err(); err != nil { - return nil, err - } - - return &Crontab{ - jobs: jobs, - context: &Context{ - shell: shell, - environ: environ, - }, - }, nil -} - -func drainReader(wg sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader) { - wg.Add(1) - - go func() { - defer wg.Done() - - scanner := bufio.NewScanner(reader) - - for scanner.Scan() { - readerLogger.Info(scanner.Text()) - } - - - if err := scanner.Err(); err != nil { - // The underlying reader might get closed by e.g. Wait(), or - // even the process we're starting, so we don't log EOF-like - // errors - if (strings.Contains(err.Error(), os.ErrClosed.Error())) { - return - } - - readerLogger.Error(err) - } - }() -} - -func runJob(context *Context, command string, jobLogger *logrus.Entry) error { - jobLogger.Info("starting") - - cmd := exec.Command(context.shell, "-c", command) - - env := os.Environ() - for k, v := range context.environ { - env = append(env, fmt.Sprintf("%s=%s", k, v)) - } - cmd.Env = env - - stdout, err := cmd.StdoutPipe() - if err != nil { - return err - } - - stderr, err := cmd.StderrPipe() - if err != nil { - return err - } - - if err := cmd.Start(); err != nil { - return err - } - - var wg sync.WaitGroup - - stdoutLogger := jobLogger.WithFields(logrus.Fields{ "channel": "stdout", }) - go drainReader(wg, stdoutLogger, stdout) - - stderrLogger := jobLogger.WithFields(logrus.Fields{ "channel": "stderr", }) - go drainReader(wg, stderrLogger, stderr) - - wg.Wait() - - if err := cmd.Wait(); err != nil { - return err - } - - jobLogger.Info("job succeeded") - - return nil -} - -func runCron(context *Context, job *Job, exitChan chan interface{}) { - // NOTE: this (intentionally) does not run multiple instances of the - // job concurrently - cronLogger := logrus.WithFields(logrus.Fields{ - "job.schedule": job.schedule, - "job.command": job.command, - "job.position": job.position, - }) - - var cronIteration uint64 = 0 - nextRun := job.expression.Next(time.Now()) - - for { - nextRun = job.expression.Next(nextRun) - cronLogger.Debugf("job will run next at %v", nextRun) - - delay := nextRun.Sub(time.Now()) - if (delay < 0) { - cronLogger.Warningf("job took too long to run: it should have started %v ago", -delay) - nextRun = time.Now() - continue - } - - time.Sleep(delay) - - jobLogger := cronLogger.WithFields(logrus.Fields{ - "iteration": cronIteration, - }) - - if err := runJob(context, job.command, jobLogger); err != nil { - cronLogger.Error(err) - } - - cronIteration++ - } -} func main() { // TODO: debug flag instead @@ -264,7 +33,7 @@ func main() { } defer file.Close() - crontab, err := parseCrontab(bufio.NewScanner(file)) + crontab, err := crontab.ParseCrontab(bufio.NewScanner(file)) if (err != nil) { logrus.Fatal(err) @@ -276,8 +45,8 @@ func main() { // request in. requestExitChan := make(chan interface{}) - for _, job := range crontab.jobs { - go runCron(crontab.context, job, requestExitChan) + for _, job := range crontab.Jobs { + go cron.StartJob(crontab.Context, job, requestExitChan) } <-requestExitChan From 217b19ac2d1a9a223a2aba0f563e650d76f85a98 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 18:32:38 +0200 Subject: [PATCH 04/21] Rename to supercronic --- cron/cron.go | 2 +- glide.yaml | 2 +- main.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cron/cron.go b/cron/cron.go index a665de3..a59a4c4 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -9,7 +9,7 @@ import ( "os/exec" "sync" "strings" - "github.com/aptible/concron/crontab" + "github.com/aptible/supercronic/crontab" "github.com/sirupsen/logrus" ) diff --git a/glide.yaml b/glide.yaml index 5dac775..b19b363 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,4 +1,4 @@ -package: github.com/aptible/concron +package: github.com/aptible/supercronic import: - package: github.com/gorhill/cronexpr version: ~1.0.0 diff --git a/main.go b/main.go index 3735e47..1a75910 100644 --- a/main.go +++ b/main.go @@ -4,8 +4,8 @@ import ( "fmt" "bufio" "os" - "github.com/aptible/concron/cron" - "github.com/aptible/concron/crontab" + "github.com/aptible/supercronic/cron" + "github.com/aptible/supercronic/crontab" "github.com/sirupsen/logrus" ) From 75f2893d53658ae89db0415c1c2b88347063688b Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 19:17:43 +0200 Subject: [PATCH 05/21] Vendor test dependencies --- glide.lock | 10 ++++++++-- glide.yaml | 2 ++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/glide.lock b/glide.lock index 56c68c2..52da280 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,14 @@ -hash: ed8b81bb597a9419eaf363c402ac8db873b75929e07285d718e35f7ccbe9439d -updated: 2017-07-06T19:02:35.835644965+02:00 +hash: 2046ce69038ba931646337acdd6e820daf64b15d37546ee6182ae1dc2d4689c5 +updated: 2017-07-07T18:48:58.278207889+02:00 imports: - name: github.com/gorhill/cronexpr version: a557574d6c024ed6e36acc8b610f5f211c91568a - name: github.com/sirupsen/logrus version: 202f25545ea4cf9b191ff7f846df5d87c9382c2b +- name: github.com/stretchr/testify + version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 +- name: golang.org/x/sys + version: 739734461d1c916b6c72a63d7efda2b27edb369f + subpackages: + - unix testImports: [] diff --git a/glide.yaml b/glide.yaml index b19b363..9bbf659 100644 --- a/glide.yaml +++ b/glide.yaml @@ -4,3 +4,5 @@ import: version: ~1.0.0 - package: github.com/sirupsen/logrus version: ~1.0.0 +- package: github.com/stretchr/testify + version: ~1.1.4 From 88c81bfa68b2a7b8c9a96dc308212b8d2627e5da Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 19:17:55 +0200 Subject: [PATCH 06/21] Add crontab parsing tests --- crontab/crontab.go | 32 +++---- crontab/crontab_test.go | 198 ++++++++++++++++++++++++++++++++++++++++ crontab/types.go | 8 +- main.go | 13 +-- 4 files changed, 223 insertions(+), 28 deletions(-) create mode 100644 crontab/crontab_test.go diff --git a/crontab/crontab.go b/crontab/crontab.go index 6d9a0ca..436cdf3 100644 --- a/crontab/crontab.go +++ b/crontab/crontab.go @@ -1,17 +1,18 @@ package crontab import ( - "fmt" "bufio" - "strings" - "regexp" + "fmt" "github.com/gorhill/cronexpr" "github.com/sirupsen/logrus" + "io" + "regexp" + "strings" ) var ( jobLineSeparator = regexp.MustCompile(`\S+`) - envLineMatcher = regexp.MustCompile(`^([^\s=]+)\s*=\s*(.*)$`) + envLineMatcher = regexp.MustCompile(`^([^\s=]+)\s*=\s*(.*)$`) parameterCounts = []int{ 7, // POSIX + seconds + years @@ -19,7 +20,6 @@ var ( 5, // POSIX 1, // shorthand (e.g. @hourly) } - ) func parseJobLine(line string) (*crontabLine, error) { @@ -30,7 +30,7 @@ func parseJobLine(line string) (*crontabLine, error) { continue } - scheduleEnds := indices[count - 1][1] + scheduleEnds := indices[count-1][1] commandStarts := indices[count][0] // TODO: Should receive a logger? @@ -38,20 +38,22 @@ func parseJobLine(line string) (*crontabLine, error) { expr, err := cronexpr.Parse(line[:scheduleEnds]) - if (err != nil) { + if err != nil { continue } return &crontabLine{ Expression: expr, - Schedule: line[:scheduleEnds], - Command: line[commandStarts:], + Schedule: line[:scheduleEnds], + Command: line[commandStarts:], }, nil } return nil, fmt.Errorf("bad crontab line: %s", line) } -func ParseCrontab(scanner *bufio.Scanner) (*Crontab, error) { +func ParseCrontab(reader io.Reader) (*Crontab, error) { + scanner := bufio.NewScanner(reader) + // TODO: Don't return an array of Job, return an object representing the crontab // TODO: Understand environment variables, too. // TODO: Increment position @@ -64,7 +66,7 @@ func ParseCrontab(scanner *bufio.Scanner) (*Crontab, error) { shell := "/bin/sh" for scanner.Scan() { - line := strings.TrimLeft(scanner.Text(), " \t"); + line := strings.TrimLeft(scanner.Text(), " \t") if line == "" { continue @@ -88,15 +90,14 @@ func ParseCrontab(scanner *bufio.Scanner) (*Crontab, error) { } jobLine, err := parseJobLine(line) - if (err != nil) { + if err != nil { return nil, err } - jobs = append(jobs, &Job{crontabLine: *jobLine, Position: position,}) + jobs = append(jobs, &Job{crontabLine: *jobLine, Position: position}) position++ } - if err := scanner.Err(); err != nil { return nil, err } @@ -104,9 +105,8 @@ func ParseCrontab(scanner *bufio.Scanner) (*Crontab, error) { return &Crontab{ Jobs: jobs, Context: &Context{ - Shell: shell, + Shell: shell, Environ: environ, }, }, nil } - diff --git a/crontab/crontab_test.go b/crontab/crontab_test.go new file mode 100644 index 0000000..68b43a7 --- /dev/null +++ b/crontab/crontab_test.go @@ -0,0 +1,198 @@ +package crontab + +import ( + "bytes" + "fmt" + "github.com/stretchr/testify/assert" + "testing" +) + +var parseCrontabTestCases = []struct { + crontab string + expected *Crontab +}{ + // Success cases + { + "FOO=bar\n", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{"FOO": "bar"}, + }, + Jobs: []*Job{}, + }, + }, + + { + "FOO=bar", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{"FOO": "bar"}, + }, + Jobs: []*Job{}, + }, + }, + + { + "* * * * * foo some # qux", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{}, + }, + Jobs: []*Job{ + { + crontabLine: crontabLine{ + Schedule: "* * * * *", + Command: "foo some # qux", + }, + }, + }, + }, + }, + + { + "* * * * * foo\nSHELL=some\n1 1 1 1 1 bar\nKEY=VAL", + &Crontab{ + Context: &Context{ + Shell: "some", + Environ: map[string]string{ + "KEY": "VAL", + }, + }, + Jobs: []*Job{ + { + crontabLine: crontabLine{ + Schedule: "* * * * *", + Command: "foo", + }, + }, + { + crontabLine: crontabLine{ + Schedule: "1 1 1 1 1", + Command: "bar", + }, + }, + }, + }, + }, + + { + "* * * * * * with year\n* * * * * * * with seconds\n@daily with shorthand", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{}, + }, + Jobs: []*Job{ + { + crontabLine: crontabLine{ + Schedule: "* * * * * *", + Command: "with year", + }, + }, + { + crontabLine: crontabLine{ + Schedule: "* * * * * * *", + Command: "with seconds", + }, + }, + { + crontabLine: crontabLine{ + Schedule: "@daily", + Command: "with shorthand", + }, + }, + }, + }, + }, + + { + "# * * * * * * commented\n\n\n # some\n\t\n\t# more\n \t */2 * * * * will run", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{}, + }, + Jobs: []*Job{ + { + crontabLine: crontabLine{ + Schedule: "*/2 * * * *", + Command: "will run", + }, + }, + }, + }, + }, + + { + "* * * * * \twith plenty of whitespace", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{}, + }, + Jobs: []*Job{ + { + crontabLine: crontabLine{ + Schedule: "* * * * *", + Command: "with plenty of whitespace", + }, + }, + }, + }, + }, + + { + "*\t*\t*\t*\t*\ttabs everywhere\n", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{}, + }, + Jobs: []*Job{ + { + crontabLine: crontabLine{ + Schedule: "*\t*\t*\t*\t*", + Command: "tabs everywhere", + }, + }, + }, + }, + }, + + // Failure cases + {"* foo \n", nil}, + {"* some * * * more\n", nil}, + {"* some * * * \n", nil}, + {"FOO\n", nil}, +} + +func TestParseCrontab(t *testing.T) { + for _, tt := range parseCrontabTestCases { + label := fmt.Sprintf("ParseCrontab(%q)", tt.crontab) + + reader := bytes.NewBufferString(tt.crontab) + + crontab, err := ParseCrontab(reader) + + if tt.expected == nil { + assert.Nil(t, crontab, label) + assert.NotNil(t, err, label) + } else { + if assert.NotNil(t, crontab, label) { + assert.Equal(t, tt.expected.Context, crontab.Context, label) + + if assert.Equal(t, len(tt.expected.Jobs), len(crontab.Jobs), label) { + for i, crontabJob := range crontab.Jobs { + expectedJob := tt.expected.Jobs[i] + assert.Equal(t, expectedJob.Command, crontabJob.Command, label) + assert.Equal(t, expectedJob.Schedule, crontabJob.Schedule, label) + assert.NotNil(t, crontabJob.Expression, label) + } + } + } + } + } +} diff --git a/crontab/types.go b/crontab/types.go index e88c821..87dae37 100644 --- a/crontab/types.go +++ b/crontab/types.go @@ -6,8 +6,8 @@ import ( type crontabLine struct { Expression *cronexpr.Expression - Schedule string - Command string + Schedule string + Command string } type Job struct { @@ -16,11 +16,11 @@ type Job struct { } type Context struct { - Shell string + Shell string Environ map[string]string } type Crontab struct { - Jobs []*Job + Jobs []*Job Context *Context } diff --git a/main.go b/main.go index 1a75910..31dffb0 100644 --- a/main.go +++ b/main.go @@ -2,22 +2,19 @@ package main import ( "fmt" - "bufio" - "os" "github.com/aptible/supercronic/cron" "github.com/aptible/supercronic/crontab" "github.com/sirupsen/logrus" + "os" ) - - func main() { // TODO: debug flag instead // TODO: JSON logging? logrus.SetLevel(logrus.DebugLevel) - logrus.SetFormatter(&logrus.TextFormatter{ FullTimestamp: true, }) + logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true}) - if (len(os.Args) != 2) { + if len(os.Args) != 2 { fmt.Fprintf(os.Stderr, "Usage: %s CRONTAB\n", os.Args[0]) os.Exit(2) return @@ -33,9 +30,9 @@ func main() { } defer file.Close() - crontab, err := crontab.ParseCrontab(bufio.NewScanner(file)) + crontab, err := crontab.ParseCrontab(file) - if (err != nil) { + if err != nil { logrus.Fatal(err) return } From 1fb787cbcbcbcf1d4b2814be81e4a22686184927 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 20:02:09 +0200 Subject: [PATCH 07/21] Add cron RunJob tests --- cron/cron.go | 32 +++++----- cron/cron_test.go | 151 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 16 deletions(-) create mode 100644 cron/cron_test.go diff --git a/cron/cron.go b/cron/cron.go index a59a4c4..fc7b867 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -1,19 +1,18 @@ package cron import ( - "fmt" - "time" "bufio" + "fmt" + "github.com/aptible/supercronic/crontab" + "github.com/sirupsen/logrus" "io" "os" "os/exec" - "sync" "strings" - "github.com/aptible/supercronic/crontab" - "github.com/sirupsen/logrus" + "sync" + "time" ) - func drainReader(wg sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader) { wg.Add(1) @@ -26,12 +25,11 @@ func drainReader(wg sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader readerLogger.Info(scanner.Text()) } - if err := scanner.Err(); err != nil { // The underlying reader might get closed by e.g. Wait(), or // even the process we're starting, so we don't log EOF-like // errors - if (strings.Contains(err.Error(), os.ErrClosed.Error())) { + if strings.Contains(err.Error(), os.ErrClosed.Error()) { return } @@ -67,10 +65,10 @@ func runJob(context *crontab.Context, command string, jobLogger *logrus.Entry) e var wg sync.WaitGroup - stdoutLogger := jobLogger.WithFields(logrus.Fields{ "channel": "stdout", }) + stdoutLogger := jobLogger.WithFields(logrus.Fields{"channel": "stdout"}) go drainReader(wg, stdoutLogger, stdout) - stderrLogger := jobLogger.WithFields(logrus.Fields{ "channel": "stderr", }) + stderrLogger := jobLogger.WithFields(logrus.Fields{"channel": "stderr"}) go drainReader(wg, stderrLogger, stderr) wg.Wait() @@ -79,8 +77,6 @@ func runJob(context *crontab.Context, command string, jobLogger *logrus.Entry) e return err } - jobLogger.Info("job succeeded") - return nil } @@ -89,7 +85,7 @@ func StartJob(context *crontab.Context, job *crontab.Job, exitChan chan interfac // job concurrently cronLogger := logrus.WithFields(logrus.Fields{ "job.schedule": job.Schedule, - "job.command": job.Command, + "job.command": job.Command, "job.position": job.Position, }) @@ -101,7 +97,7 @@ func StartJob(context *crontab.Context, job *crontab.Job, exitChan chan interfac cronLogger.Debugf("job will run next at %v", nextRun) delay := nextRun.Sub(time.Now()) - if (delay < 0) { + if delay < 0 { cronLogger.Warningf("job took too long to run: it should have started %v ago", -delay) nextRun = time.Now() continue @@ -113,8 +109,12 @@ func StartJob(context *crontab.Context, job *crontab.Job, exitChan chan interfac "iteration": cronIteration, }) - if err := runJob(context, job.Command, jobLogger); err != nil { - cronLogger.Error(err) + err := runJob(context, job.Command, jobLogger) + + if err == nil { + jobLogger.Info("job succeeded") + } else { + jobLogger.Error(err) } cronIteration++ diff --git a/cron/cron_test.go b/cron/cron_test.go new file mode 100644 index 0000000..afcef51 --- /dev/null +++ b/cron/cron_test.go @@ -0,0 +1,151 @@ +package cron + +import ( + "fmt" + "github.com/aptible/supercronic/crontab" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "io/ioutil" + "testing" + "time" +) + +var ( + BUFFER_SIZE = 100 +) + +type testHook struct { + channel chan *logrus.Entry +} + +func newTestHook(channel chan *logrus.Entry) *testHook { + return &testHook{channel: channel} +} + +func (hook *testHook) Fire(entry *logrus.Entry) error { + hook.channel <- entry + return nil +} + +func (hook *testHook) Levels() []logrus.Level { + return logrus.AllLevels +} + +func newTestLogger() (*logrus.Entry, chan *logrus.Entry) { + logger := logrus.New() + logger.Out = ioutil.Discard + + channel := make(chan *logrus.Entry, BUFFER_SIZE) + hook := newTestHook(channel) + logger.Hooks.Add(hook) + + return logger.WithFields(logrus.Fields{}), channel +} + +var ( + basicContext = &crontab.Context{ + Shell: "/bin/sh", + Environ: map[string]string{}, + } + + noData logrus.Fields = logrus.Fields{} + stdoutData = logrus.Fields{"channel": "stdout"} + stderrData = logrus.Fields{"channel": "stderr"} +) + +var runJobTestCases = []struct { + command string + success bool + context *crontab.Context + messages []*logrus.Entry +}{ + { + "true", true, basicContext, + []*logrus.Entry{ + {Message: "starting", Level: logrus.InfoLevel, Data: noData}, + }, + }, + { + "false", false, basicContext, + []*logrus.Entry{ + {Message: "starting", Level: logrus.InfoLevel, Data: noData}, + }, + }, + { + "echo hello", true, basicContext, + []*logrus.Entry{ + {Message: "starting", Level: logrus.InfoLevel, Data: noData}, + {Message: "hello", Level: logrus.InfoLevel, Data: stdoutData}, + }, + }, + { + "echo hello >&2", true, basicContext, + []*logrus.Entry{ + {Message: "starting", Level: logrus.InfoLevel, Data: noData}, + {Message: "hello", Level: logrus.InfoLevel, Data: stderrData}, + }, + }, + { + "echo $FOO", true, + &crontab.Context{ + Shell: "/bin/sh", + Environ: map[string]string{"FOO": "BAR"}, + }, + []*logrus.Entry{ + {Message: "starting", Level: logrus.InfoLevel, Data: noData}, + {Message: "BAR", Level: logrus.InfoLevel, Data: stdoutData}, + }, + }, + { + "true", false, + &crontab.Context{ + Shell: "/bin/false", + Environ: map[string]string{}, + }, + []*logrus.Entry{ + {Message: "starting", Level: logrus.InfoLevel, Data: noData}, + }, + }, + { + "echo hello\nsleep 0.1\necho bar >&2", true, basicContext, + []*logrus.Entry{ + {Message: "starting", Level: logrus.InfoLevel, Data: noData}, + {Message: "hello", Level: logrus.InfoLevel, Data: stdoutData}, + {Message: "bar", Level: logrus.InfoLevel, Data: stderrData}, + }, + }, +} + +func TestRunJob(t *testing.T) { + for _, tt := range runJobTestCases { + label := fmt.Sprintf("RunJob(%q)", tt.command) + logger, channel := newTestLogger() + + err := runJob(tt.context, tt.command, logger) + if tt.success { + assert.Nil(t, err, label) + } else { + assert.NotNil(t, err, label) + } + + done := false + + for { + if done || len(tt.messages) == 0 { + break + } + + select { + case entry := <-channel: + var expected *logrus.Entry + expected, tt.messages = tt.messages[0], tt.messages[1:] + assert.Equal(t, expected.Message, entry.Message, label) + assert.Equal(t, expected.Level, entry.Level, label) + assert.Equal(t, expected.Data, entry.Data, label) + case <-time.After(time.Second): + t.Errorf("timed out waiting for %q (%s)", tt.messages[0].Message, label) + done = true + } + } + } +} From 599921ecf58d30e0924274cea047b88308e7743b Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 20:05:34 +0200 Subject: [PATCH 08/21] Add .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3f1802d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +supercronic From fd6bb6c38f5b7975e397691458e01daf05796dc8 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 20:07:34 +0200 Subject: [PATCH 09/21] Add Makefile for easier gofmt / test --- Makefile | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f68600d --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +GOFILES_NOVENDOR = $(shell find . -type f -name '*.go' -not -path "./vendor/*") +SHELL=/bin/bash + +.PHONY: build +build: $(GOFILES) + go build + +.PHONY: test +test: + go test $$(go list ./... | grep -v /vendor/) + go vet $$(go list ./... | grep -v /vendor/) + +.PHONY: fmt +fmt: + gofmt -l -w ${GOFILES_NOVENDOR} From 63f5cb2f8e289935659e15f095a84b9a5c3ba896 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 20:12:57 +0200 Subject: [PATCH 10/21] Fix sync.WaitGroup pass-by-copy --- cron/cron.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cron/cron.go b/cron/cron.go index fc7b867..c545bff 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -13,7 +13,7 @@ import ( "time" ) -func drainReader(wg sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader) { +func drainReader(wg *sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader) { wg.Add(1) go func() { @@ -66,10 +66,10 @@ func runJob(context *crontab.Context, command string, jobLogger *logrus.Entry) e var wg sync.WaitGroup stdoutLogger := jobLogger.WithFields(logrus.Fields{"channel": "stdout"}) - go drainReader(wg, stdoutLogger, stdout) + go drainReader(&wg, stdoutLogger, stdout) stderrLogger := jobLogger.WithFields(logrus.Fields{"channel": "stderr"}) - go drainReader(wg, stderrLogger, stderr) + go drainReader(&wg, stderrLogger, stderr) wg.Wait() From 80e56ae066bf4f4138a40755ca0720fc102896c7 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 7 Jul 2017 20:51:25 +0200 Subject: [PATCH 11/21] Handle SIGINT, SIGTERM --- cron/cron.go | 88 +++++++++++++++++++++++++++++++--------------------- main.go | 34 +++++++++++++++----- 2 files changed, 78 insertions(+), 44 deletions(-) diff --git a/cron/cron.go b/cron/cron.go index c545bff..3be2a8c 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -10,10 +10,11 @@ import ( "os/exec" "strings" "sync" + "syscall" "time" ) -func drainReader(wg *sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader) { +func startReaderDrain(wg *sync.WaitGroup, readerLogger *logrus.Entry, reader io.Reader) { wg.Add(1) go func() { @@ -43,6 +44,10 @@ func runJob(context *crontab.Context, command string, jobLogger *logrus.Entry) e cmd := exec.Command(context.Shell, "-c", command) + // Run in a separate process group so that in interactive usage, CTRL+C + // stops supercronic, not the children threads. + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + env := os.Environ() for k, v := range context.Environ { env = append(env, fmt.Sprintf("%s=%s", k, v)) @@ -66,10 +71,10 @@ func runJob(context *crontab.Context, command string, jobLogger *logrus.Entry) e var wg sync.WaitGroup stdoutLogger := jobLogger.WithFields(logrus.Fields{"channel": "stdout"}) - go drainReader(&wg, stdoutLogger, stdout) + go startReaderDrain(&wg, stdoutLogger, stdout) stderrLogger := jobLogger.WithFields(logrus.Fields{"channel": "stderr"}) - go drainReader(&wg, stderrLogger, stderr) + go startReaderDrain(&wg, stderrLogger, stderr) wg.Wait() @@ -80,43 +85,54 @@ func runJob(context *crontab.Context, command string, jobLogger *logrus.Entry) e return nil } -func StartJob(context *crontab.Context, job *crontab.Job, exitChan chan interface{}) { - // NOTE: this (intentionally) does not run multiple instances of the - // job concurrently - cronLogger := logrus.WithFields(logrus.Fields{ - "job.schedule": job.Schedule, - "job.command": job.Command, - "job.position": job.Position, - }) - - var cronIteration uint64 = 0 - nextRun := job.Expression.Next(time.Now()) - - for { - nextRun = job.Expression.Next(nextRun) - cronLogger.Debugf("job will run next at %v", nextRun) - - delay := nextRun.Sub(time.Now()) - if delay < 0 { - cronLogger.Warningf("job took too long to run: it should have started %v ago", -delay) - nextRun = time.Now() - continue - } +func StartJob(wg *sync.WaitGroup, context *crontab.Context, job *crontab.Job, exitChan chan interface{}) { + wg.Add(1) - time.Sleep(delay) + go func() { + defer wg.Done() - jobLogger := cronLogger.WithFields(logrus.Fields{ - "iteration": cronIteration, + cronLogger := logrus.WithFields(logrus.Fields{ + "job.schedule": job.Schedule, + "job.command": job.Command, "job.position": job.Position, }) - err := runJob(context, job.Command, jobLogger) + var cronIteration uint64 = 0 + nextRun := job.Expression.Next(time.Now()) - if err == nil { - jobLogger.Info("job succeeded") - } else { - jobLogger.Error(err) - } + // NOTE: this (intentionally) does not run multiple instances of the + // job concurrently + for { + nextRun = job.Expression.Next(nextRun) + cronLogger.Debugf("job will run next at %v", nextRun) - cronIteration++ - } + delay := nextRun.Sub(time.Now()) + if delay < 0 { + cronLogger.Warningf("job took too long to run: it should have started %v ago", -delay) + nextRun = time.Now() + continue + } + + select { + case <-exitChan: + cronLogger.Debug("shutting down") + return + case <-time.After(delay): + // Proceed normally + } + + jobLogger := cronLogger.WithFields(logrus.Fields{ + "iteration": cronIteration, + }) + + err := runJob(context, job.Command, jobLogger) + + if err == nil { + jobLogger.Info("job succeeded") + } else { + jobLogger.Error(err) + } + + cronIteration++ + } + }() } diff --git a/main.go b/main.go index 31dffb0..f171a8e 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,9 @@ import ( "github.com/aptible/supercronic/crontab" "github.com/sirupsen/logrus" "os" + "os/signal" + "sync" + "syscall" ) func main() { @@ -30,21 +33,36 @@ func main() { } defer file.Close() - crontab, err := crontab.ParseCrontab(file) + tab, err := crontab.ParseCrontab(file) if err != nil { logrus.Fatal(err) return } - // TODO: Signal handling. - // TODO: Should actually have a sync group here, and send the exit - // request in. - requestExitChan := make(chan interface{}) + var ( + wg sync.WaitGroup + exitChans []chan interface{} + ) - for _, job := range crontab.Jobs { - go cron.StartJob(crontab.Context, job, requestExitChan) + for _, job := range tab.Jobs { + c := make(chan interface{}, 1) + exitChans = append(exitChans, c) + cron.StartJob(&wg, tab.Context, job, c) } - <-requestExitChan + termChan := make(chan os.Signal, 1) + signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) + + termSig := <-termChan + + logrus.Infof("received %s, shutting down", termSig) + for _, c := range exitChans { + c <- true + } + + logrus.Info("waiting for jobs to finish") + wg.Wait() + + logrus.Info("exiting") } From 4023ab896e5c6a73d0c3bcc19aed67b2a14aaf6b Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Mon, 10 Jul 2017 18:22:36 +0200 Subject: [PATCH 12/21] StartJob: basic test --- cron/cron.go | 3 ++- cron/cron_test.go | 40 ++++++++++++++++++++++++++++++++++------ crontab/crontab.go | 6 +++--- crontab/crontab_test.go | 18 +++++++++--------- crontab/types.go | 12 ++++++++---- 5 files changed, 56 insertions(+), 23 deletions(-) diff --git a/cron/cron.go b/cron/cron.go index 3be2a8c..5612022 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -93,7 +93,8 @@ func StartJob(wg *sync.WaitGroup, context *crontab.Context, job *crontab.Job, ex cronLogger := logrus.WithFields(logrus.Fields{ "job.schedule": job.Schedule, - "job.command": job.Command, "job.position": job.Position, + "job.command": job.Command, + "job.position": job.Position, }) var cronIteration uint64 = 0 diff --git a/cron/cron_test.go b/cron/cron_test.go index afcef51..39517f8 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -6,6 +6,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "io/ioutil" + "sync" "testing" "time" ) @@ -42,8 +43,15 @@ func newTestLogger() (*logrus.Entry, chan *logrus.Entry) { return logger.WithFields(logrus.Fields{}), channel } +type testExpression struct { +} + +func (expr *testExpression) Next(t time.Time) time.Time { + return t.Add(time.Minute) +} + var ( - basicContext = &crontab.Context{ + basicContext = crontab.Context{ Shell: "/bin/sh", Environ: map[string]string{}, } @@ -60,26 +68,26 @@ var runJobTestCases = []struct { messages []*logrus.Entry }{ { - "true", true, basicContext, + "true", true, &basicContext, []*logrus.Entry{ {Message: "starting", Level: logrus.InfoLevel, Data: noData}, }, }, { - "false", false, basicContext, + "false", false, &basicContext, []*logrus.Entry{ {Message: "starting", Level: logrus.InfoLevel, Data: noData}, }, }, { - "echo hello", true, basicContext, + "echo hello", true, &basicContext, []*logrus.Entry{ {Message: "starting", Level: logrus.InfoLevel, Data: noData}, {Message: "hello", Level: logrus.InfoLevel, Data: stdoutData}, }, }, { - "echo hello >&2", true, basicContext, + "echo hello >&2", true, &basicContext, []*logrus.Entry{ {Message: "starting", Level: logrus.InfoLevel, Data: noData}, {Message: "hello", Level: logrus.InfoLevel, Data: stderrData}, @@ -107,7 +115,7 @@ var runJobTestCases = []struct { }, }, { - "echo hello\nsleep 0.1\necho bar >&2", true, basicContext, + "echo hello\nsleep 0.1\necho bar >&2", true, &basicContext, []*logrus.Entry{ {Message: "starting", Level: logrus.InfoLevel, Data: noData}, {Message: "hello", Level: logrus.InfoLevel, Data: stdoutData}, @@ -149,3 +157,23 @@ func TestRunJob(t *testing.T) { } } } + +func TestStartJobExitsOnRequest(t *testing.T) { + job := crontab.Job{ + CrontabLine: crontab.CrontabLine{ + Expression: &testExpression{}, + Schedule: "always!", + Command: "true", + }, + Position: 1, + } + + exitChan := make(chan interface{}, 1) + exitChan <- nil + + var wg sync.WaitGroup + + StartJob(&wg, &basicContext, &job, exitChan) + + wg.Wait() +} diff --git a/crontab/crontab.go b/crontab/crontab.go index 436cdf3..33db4fe 100644 --- a/crontab/crontab.go +++ b/crontab/crontab.go @@ -22,7 +22,7 @@ var ( } ) -func parseJobLine(line string) (*crontabLine, error) { +func parseJobLine(line string) (*CrontabLine, error) { indices := jobLineSeparator.FindAllStringIndex(line, -1) for _, count := range parameterCounts { @@ -42,7 +42,7 @@ func parseJobLine(line string) (*crontabLine, error) { continue } - return &crontabLine{ + return &CrontabLine{ Expression: expr, Schedule: line[:scheduleEnds], Command: line[commandStarts:], @@ -94,7 +94,7 @@ func ParseCrontab(reader io.Reader) (*Crontab, error) { return nil, err } - jobs = append(jobs, &Job{crontabLine: *jobLine, Position: position}) + jobs = append(jobs, &Job{CrontabLine: *jobLine, Position: position}) position++ } diff --git a/crontab/crontab_test.go b/crontab/crontab_test.go index 68b43a7..610b32c 100644 --- a/crontab/crontab_test.go +++ b/crontab/crontab_test.go @@ -43,7 +43,7 @@ var parseCrontabTestCases = []struct { }, Jobs: []*Job{ { - crontabLine: crontabLine{ + CrontabLine: CrontabLine{ Schedule: "* * * * *", Command: "foo some # qux", }, @@ -63,13 +63,13 @@ var parseCrontabTestCases = []struct { }, Jobs: []*Job{ { - crontabLine: crontabLine{ + CrontabLine: CrontabLine{ Schedule: "* * * * *", Command: "foo", }, }, { - crontabLine: crontabLine{ + CrontabLine: CrontabLine{ Schedule: "1 1 1 1 1", Command: "bar", }, @@ -87,19 +87,19 @@ var parseCrontabTestCases = []struct { }, Jobs: []*Job{ { - crontabLine: crontabLine{ + CrontabLine: CrontabLine{ Schedule: "* * * * * *", Command: "with year", }, }, { - crontabLine: crontabLine{ + CrontabLine: CrontabLine{ Schedule: "* * * * * * *", Command: "with seconds", }, }, { - crontabLine: crontabLine{ + CrontabLine: CrontabLine{ Schedule: "@daily", Command: "with shorthand", }, @@ -117,7 +117,7 @@ var parseCrontabTestCases = []struct { }, Jobs: []*Job{ { - crontabLine: crontabLine{ + CrontabLine: CrontabLine{ Schedule: "*/2 * * * *", Command: "will run", }, @@ -135,7 +135,7 @@ var parseCrontabTestCases = []struct { }, Jobs: []*Job{ { - crontabLine: crontabLine{ + CrontabLine: CrontabLine{ Schedule: "* * * * *", Command: "with plenty of whitespace", }, @@ -153,7 +153,7 @@ var parseCrontabTestCases = []struct { }, Jobs: []*Job{ { - crontabLine: crontabLine{ + CrontabLine: CrontabLine{ Schedule: "*\t*\t*\t*\t*", Command: "tabs everywhere", }, diff --git a/crontab/types.go b/crontab/types.go index 87dae37..00dfd9a 100644 --- a/crontab/types.go +++ b/crontab/types.go @@ -1,17 +1,21 @@ package crontab import ( - "github.com/gorhill/cronexpr" + "time" ) -type crontabLine struct { - Expression *cronexpr.Expression +type Expression interface { + Next(fromTime time.Time) time.Time +} + +type CrontabLine struct { + Expression Expression Schedule string Command string } type Job struct { - crontabLine + CrontabLine Position int } From 3522b1ec95f9d4454a3718250e9c5706beb38a93 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Mon, 10 Jul 2017 18:42:48 +0200 Subject: [PATCH 13/21] Add integration tests --- Makefile | 3 ++- integration/env.crontab | 1 + integration/hello.crontab | 1 + integration/override.crontab | 2 ++ integration/test.bats | 21 +++++++++++++++++++++ integration/timeout.crontab | 1 + 6 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 integration/env.crontab create mode 100644 integration/hello.crontab create mode 100644 integration/override.crontab create mode 100644 integration/test.bats create mode 100644 integration/timeout.crontab diff --git a/Makefile b/Makefile index f68600d..d5b3045 100644 --- a/Makefile +++ b/Makefile @@ -6,9 +6,10 @@ build: $(GOFILES) go build .PHONY: test -test: +test: build go test $$(go list ./... | grep -v /vendor/) go vet $$(go list ./... | grep -v /vendor/) + bats integration .PHONY: fmt fmt: diff --git a/integration/env.crontab b/integration/env.crontab new file mode 100644 index 0000000..de64766 --- /dev/null +++ b/integration/env.crontab @@ -0,0 +1 @@ +* * * * * * * echo "$VAR" diff --git a/integration/hello.crontab b/integration/hello.crontab new file mode 100644 index 0000000..8cb44b9 --- /dev/null +++ b/integration/hello.crontab @@ -0,0 +1 @@ +* * * * * * * echo "hello from crontab" diff --git a/integration/override.crontab b/integration/override.crontab new file mode 100644 index 0000000..0aebba0 --- /dev/null +++ b/integration/override.crontab @@ -0,0 +1,2 @@ +VAR="hello from bar" +* * * * * * * echo "$VAR" diff --git a/integration/test.bats b/integration/test.bats new file mode 100644 index 0000000..747af10 --- /dev/null +++ b/integration/test.bats @@ -0,0 +1,21 @@ +function run_supercronic() { + local crontab="$1" + local timeout="${2:-2s}" + timeout --preserve-status --kill-after "30s" "$timeout" "${BATS_TEST_DIRNAME}/../supercronic" "$crontab" 2>&1 +} + +@test "it runs a cron job" { + run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -E "hello from crontab.*channel=stdout" +} + +@test "it passes the environment through" { + VAR="hello from foo" run_supercronic "${BATS_TEST_DIRNAME}/env.crontab" | grep -E "hello from foo.*channel=stdout" +} + +@test "it overrides the environment with the crontab" { + VAR="hello from foo" run_supercronic "${BATS_TEST_DIRNAME}/override.crontab" | grep -E "hello from bar.*channel=stdout" +} + +@test "it warns when a job is falling behind" { + run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -E "job took too long to run" +} diff --git a/integration/timeout.crontab b/integration/timeout.crontab new file mode 100644 index 0000000..57eab25 --- /dev/null +++ b/integration/timeout.crontab @@ -0,0 +1 @@ +* * * * * * * sleep 2 From 8ae7f372c7eca9d0f07297ceb83309dc9f48caf8 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Mon, 10 Jul 2017 18:56:00 +0200 Subject: [PATCH 14/21] Unify ENV handling with Vixie cron's --- crontab/crontab.go | 24 ++++++++++++++------ crontab/crontab_test.go | 47 +++++++++++++++++++++++++++++++++++++++- integration/test.bats | 12 ++++++---- integration/user.crontab | 1 + 4 files changed, 72 insertions(+), 12 deletions(-) create mode 100644 integration/user.crontab diff --git a/crontab/crontab.go b/crontab/crontab.go index 33db4fe..b9a592d 100644 --- a/crontab/crontab.go +++ b/crontab/crontab.go @@ -54,14 +54,11 @@ func parseJobLine(line string) (*CrontabLine, error) { func ParseCrontab(reader io.Reader) (*Crontab, error) { scanner := bufio.NewScanner(reader) - // TODO: Don't return an array of Job, return an object representing the crontab - // TODO: Understand environment variables, too. - // TODO: Increment position position := 0 jobs := make([]*Job, 0) - // TODO: CRON_TZ + // TODO: CRON_TZ? environ := make(map[string]string) shell := "/bin/sh" @@ -78,14 +75,27 @@ func ParseCrontab(reader io.Reader) (*Crontab, error) { r := envLineMatcher.FindAllStringSubmatch(line, -1) if len(r) == 1 && len(r[0]) == 3 { - // TODO: Should error on setting USER? envKey := r[0][1] envVal := r[0][2] + + // Remove quotes (this emulates what Vixie cron does) + if envVal[0] == '"' || envVal[0] == '\'' { + if len(envVal) > 1 && envVal[0] == envVal[len(envVal)-1] { + envVal = envVal[1 : len(envVal)-1] + } + } + if envKey == "SHELL" { + logrus.Infof("processes will be spawned using shell: %s", envVal) shell = envVal - } else { - environ[envKey] = envVal } + + if envKey == "USER" { + logrus.Warnf("processes will NOT be spawned as USER=%s", envVal) + } + + environ[envKey] = envVal + continue } diff --git a/crontab/crontab_test.go b/crontab/crontab_test.go index 610b32c..b9830ea 100644 --- a/crontab/crontab_test.go +++ b/crontab/crontab_test.go @@ -34,6 +34,50 @@ var parseCrontabTestCases = []struct { }, }, + { + "FOO=\"bar\"", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{"FOO": "bar"}, + }, + Jobs: []*Job{}, + }, + }, + + { + "FOO='bar'", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{"FOO": "bar"}, + }, + Jobs: []*Job{}, + }, + }, + + { + "FOO='", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{"FOO": "'"}, + }, + Jobs: []*Job{}, + }, + }, + + { + "FOO=''", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{"FOO": ""}, + }, + Jobs: []*Job{}, + }, + }, + { "* * * * * foo some # qux", &Crontab{ @@ -58,7 +102,8 @@ var parseCrontabTestCases = []struct { Context: &Context{ Shell: "some", Environ: map[string]string{ - "KEY": "VAL", + "SHELL": "some", + "KEY": "VAL", }, }, Jobs: []*Job{ diff --git a/integration/test.bats b/integration/test.bats index 747af10..cdcee39 100644 --- a/integration/test.bats +++ b/integration/test.bats @@ -5,17 +5,21 @@ function run_supercronic() { } @test "it runs a cron job" { - run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -E "hello from crontab.*channel=stdout" + run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -iE "hello from crontab.*channel=stdout" } @test "it passes the environment through" { - VAR="hello from foo" run_supercronic "${BATS_TEST_DIRNAME}/env.crontab" | grep -E "hello from foo.*channel=stdout" + VAR="hello from foo" run_supercronic "${BATS_TEST_DIRNAME}/env.crontab" | grep -iE "hello from foo.*channel=stdout" } @test "it overrides the environment with the crontab" { - VAR="hello from foo" run_supercronic "${BATS_TEST_DIRNAME}/override.crontab" | grep -E "hello from bar.*channel=stdout" + VAR="hello from foo" run_supercronic "${BATS_TEST_DIRNAME}/override.crontab" | grep -iE "hello from bar.*channel=stdout" +} + +@test "it warns when USER is set" { + run_supercronic "${BATS_TEST_DIRNAME}/user.crontab" 5s | grep -iE "processes will not.*USER=" } @test "it warns when a job is falling behind" { - run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -E "job took too long to run" + run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -iE "job took too long to run" } diff --git a/integration/user.crontab b/integration/user.crontab new file mode 100644 index 0000000..4bff77f --- /dev/null +++ b/integration/user.crontab @@ -0,0 +1 @@ +USER=foo From ddcb5ab11f2d1437cf207170f9ed17c7e058adcd Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Mon, 10 Jul 2017 19:06:14 +0200 Subject: [PATCH 15/21] Fix race condition spawning readers We need startReaderDrain to add to the WaitGroup before we move on to waiting on it. --- cron/cron.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cron/cron.go b/cron/cron.go index 5612022..47a9ef7 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -71,10 +71,10 @@ func runJob(context *crontab.Context, command string, jobLogger *logrus.Entry) e var wg sync.WaitGroup stdoutLogger := jobLogger.WithFields(logrus.Fields{"channel": "stdout"}) - go startReaderDrain(&wg, stdoutLogger, stdout) + startReaderDrain(&wg, stdoutLogger, stdout) stderrLogger := jobLogger.WithFields(logrus.Fields{"channel": "stderr"}) - go startReaderDrain(&wg, stderrLogger, stderr) + startReaderDrain(&wg, stderrLogger, stderr) wg.Wait() From 2f166eb88fe360f9c298746d3a60cb0a8bf690ca Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Mon, 10 Jul 2017 19:15:07 +0200 Subject: [PATCH 16/21] Add flag handling (debug, json logging) --- integration/noop.crontab | 0 integration/test.bats | 11 ++++++++++- main.go | 29 ++++++++++++++++++++++------- 3 files changed, 32 insertions(+), 8 deletions(-) create mode 100644 integration/noop.crontab diff --git a/integration/noop.crontab b/integration/noop.crontab new file mode 100644 index 0000000..e69de29 diff --git a/integration/test.bats b/integration/test.bats index cdcee39..0c1836c 100644 --- a/integration/test.bats +++ b/integration/test.bats @@ -1,7 +1,8 @@ function run_supercronic() { local crontab="$1" local timeout="${2:-2s}" - timeout --preserve-status --kill-after "30s" "$timeout" "${BATS_TEST_DIRNAME}/../supercronic" "$crontab" 2>&1 + timeout --preserve-status --kill-after "30s" "$timeout" \ + "${BATS_TEST_DIRNAME}/../supercronic" ${SUPERCRONIC_ARGS:-} "$crontab" 2>&1 } @test "it runs a cron job" { @@ -23,3 +24,11 @@ function run_supercronic() { @test "it warns when a job is falling behind" { run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -iE "job took too long to run" } + +@test "it supports debug logging " { + SUPERCRONIC_ARGS="-debug" run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -iE "debug" +} + +@test "it supports JSON logging " { + SUPERCRONIC_ARGS="-json" run_supercronic "${BATS_TEST_DIRNAME}/noop.crontab" | grep -iE "^{" +} diff --git a/main.go b/main.go index f171a8e..20f097b 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "flag" "fmt" "github.com/aptible/supercronic/cron" "github.com/aptible/supercronic/crontab" @@ -11,19 +12,33 @@ import ( "syscall" ) +var Usage = func() { + fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS] CRONTAB\n\nAvailable options:\n", os.Args[0]) + flag.PrintDefaults() +} + func main() { - // TODO: debug flag instead - // TODO: JSON logging? - logrus.SetLevel(logrus.DebugLevel) - logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true}) + debug := flag.Bool("debug", false, "enable debug logging") + json := flag.Bool("json", false, "enable JSON logging") + flag.Parse() + + if *debug { + logrus.SetLevel(logrus.DebugLevel) + } + + if *json { + logrus.SetFormatter(&logrus.JSONFormatter{}) + } else { + logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true}) + } - if len(os.Args) != 2 { - fmt.Fprintf(os.Stderr, "Usage: %s CRONTAB\n", os.Args[0]) + if flag.NArg() != 1 { + Usage() os.Exit(2) return } - crontabFileName := os.Args[1] + crontabFileName := flag.Args()[0] logrus.Infof("read crontab: %s", crontabFileName) file, err := os.Open(crontabFileName) From fd78433a38e56dbf82c8c0181eff29b1f7c732d6 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Mon, 10 Jul 2017 19:15:14 +0200 Subject: [PATCH 17/21] Test that parsing an empty crontab succeeds --- crontab/crontab_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/crontab/crontab_test.go b/crontab/crontab_test.go index b9830ea..43deaca 100644 --- a/crontab/crontab_test.go +++ b/crontab/crontab_test.go @@ -12,6 +12,28 @@ var parseCrontabTestCases = []struct { expected *Crontab }{ // Success cases + { + "", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{}, + }, + Jobs: []*Job{}, + }, + }, + + { + "\n", + &Crontab{ + Context: &Context{ + Shell: "/bin/sh", + Environ: map[string]string{}, + }, + Jobs: []*Job{}, + }, + }, + { "FOO=bar\n", &Crontab{ From 7bf545115747f0e6e6d567a0b2c2bcca81f2b2f8 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Mon, 10 Jul 2017 19:16:34 +0200 Subject: [PATCH 18/21] Integration: test that it actually runs a cron --- integration/test.bats | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integration/test.bats b/integration/test.bats index 0c1836c..5c85b76 100644 --- a/integration/test.bats +++ b/integration/test.bats @@ -6,7 +6,8 @@ function run_supercronic() { } @test "it runs a cron job" { - run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -iE "hello from crontab.*channel=stdout" + n="$(run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" 5s | grep -iE "hello from crontab.*channel=stdout" | wc -l)" + [[ "$n" -gt 3 ]] } @test "it passes the environment through" { From 8fa979c10ffdea593daa1e4ef611a24cc55897e0 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Mon, 10 Jul 2017 19:48:23 +0200 Subject: [PATCH 19/21] ADD README / LICENSE --- LICENSE.md | 21 ++++++++ README.md | 152 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 LICENSE.md create mode 100644 README.md diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..c00e5be --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017, Aptible, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..5d9f55a --- /dev/null +++ b/README.md @@ -0,0 +1,152 @@ +# Supercronic # + +Supercronic is a crontab-compatible job runner, designed specifically to run in +containers. + + +## Why Supercronic? ## + +Crontabs are the lingua franca of job scheduling, but typical server cron +implementations are ill-suited for container environments: + +- They purge their environment before starting jobs. This is an important + security feature in multi-user systems, but it breaks a fundamental + configuration mechanism for containers. +- They capture the output from the jobs they run, and often either want to + email this output or simply discard it. In a containerized environment, + logging task output and errors to `stdout` / `stderr` is often easier to work + with. +- They often don't respond gracefully to `SIGINT` / `SIGTERM`, and may leave + running jobs orphaned when signaled. Again, this makes sense in a server + environment where `init` will handle the orphan jobs and Cron isn't restarted + often anyway, but it's inappropriate in a container environment as it'll + result in jobs being forcefully terminated (i.e. `SIGKILL`'ed) when the + container exits. +- They often try to send their logs to syslog. This conveniently provides + centralized logging when a syslog server is running, but with containers, + simply logging to `stdout` or `stderr` is preferred. + +Finally, they are often very quiet, which makes the above issues difficult to +understand or debug! + +The list could go on, but the fundamental takeaway is this: unlike typical +server cron implementations, Supercronic tries very hard to do exactly what you +expect from running `cron` in a container: + +- Your environment variables are available in jobs. +- Job output is logged to `stdout` / `stderr`. +- `SIGTERM` (or `SIGINT`, which you can deliver via CTRL+C when used + interactively) triggers a graceful shutdown +- Job return codes and schedules are also logged to `stdout` / `stderr`. + +## How does it work? ## + +- Install Supercronic (see below). +- Point it at a crontab: `supercronic CRONTAB`. +- You're done! + + +### Installation + +- If you have a `go` toolchain available: `go install github.com/aptible/supercronic` +- TODO: Docker installation instructions / packaging. + + +## Crontab format ## + +Broadly speaking, Supercronic tries to process crontabs just like Vixie cron +does. In most cases, it should be compatible with your existing crontab. + +There are, however, a few exceptions: + +- First, Supercronic supports second-resolution schedules: under the hood, + Supercronic uses [the `cronexpr` package][cronexpr], so refer to its + documentation to know exactly what you can do. +- Second, Supercronic does not support changing users when running tasks. + Again, this is something that hardly makes sense in a cron environment. This + means that setting `USER` in your crontab won't have any effect. + +Here's an example crontab: + +``` +# Run every minute +*/1 * * * * echo "hello" + +# Run every 2 seconds +*/2 * * * * * * ls 2>/dev/null +``` + + +## Environment variables ## + +Just like regular cron, Supercronic lets you specify environment variables in +your crontab using a `KEY=VALUE` syntax. + +However, this is only here for compatibility with existing crontabs, and using +this feature is generally **not recommended** when using Supercronic. + +Indeed, Supercronic does not wipe your environment before running jobs, so if +you need environment variables to be available when your jobs run, just set +them before starting Supercronic itself, and your jobs will inherit them +(unless you've used cron before, this is exactly what you expect). + +For example, if you're using Docker, Supercronic + + +## Logging ## + +Supercronic provides rich logging, and will let you know exactly what command +triggered a given message. Here's an example: + +``` +$ cat ./my-crontab +*/5 * * * * * * echo "hello from Supercronic" + +$ ./supercronic ./my-crontab +INFO[2017-07-10T19:40:44+02:00] read crontab: ./my-crontab +INFO[2017-07-10T19:40:50+02:00] starting iteration=0 job.command="echo "hello from Supercronic"" job.position=0 job.schedule="*/5 * * * * * *" +INFO[2017-07-10T19:40:50+02:00] hello from Supercronic channel=stdout iteration=0 job.command="echo "hello from Supercronic"" job.position=0 job.schedule="*/5 * * * * * *" +INFO[2017-07-10T19:40:50+02:00] job succeeded iteration=0 job.command="echo "hello from Supercronic"" job.position=0 job.schedule="*/5 * * * * * *" +INFO[2017-07-10T19:40:55+02:00] starting iteration=1 job.command="echo "hello from Supercronic"" job.position=0 job.schedule="*/5 * * * * * *" +INFO[2017-07-10T19:40:55+02:00] hello from Supercronic channel=stdout iteration=1 job.command="echo "hello from Supercronic"" job.position=0 job.schedule="*/5 * * * * * *" +INFO[2017-07-10T19:40:55+02:00] job succeeded iteration=1 job.command="echo "hello from Supercronic"" job.position=0 job.schedule="*/5 * * * * * *" +``` + + +## Debugging ## + +If your jobs aren't running, or you'd simply like to double-check your crontab +syntax, pass the `-debug` flag for more verbose logging: + +``` +$ ./supercronic -debug ./my-crontab +INFO[2017-07-10T19:43:51+02:00] read crontab: ./my-crontab +DEBU[2017-07-10T19:43:51+02:00] try parse(7): */5 * * * * * * echo "hello from Supercronic"[0:15] = */5 * * * * * * +DEBU[2017-07-10T19:43:51+02:00] job will run next at 2017-07-10 19:44:00 +0200 CEST job.command="echo "hello from Supercronic"" job.position=0 job.schedule="*/5 * * * * * *" +``` + + +## Duplicate Jobs ## + +Supercronic will wait for a given job to finish before that job is scheduled +again (some cron implementations do this, others don't). If a job is falling +behind schedule (i.e. it's taking too long to finish), Supercronic will warn +you. + +Here is an example: + +``` +# Sleep for 2 seconds every second. This will take too long. +* * * * * * * sleep 2 + +$ ./supercronic ./my-crontab +INFO[2017-07-11T12:24:25+02:00] read crontab: foo +INFO[2017-07-11T12:24:27+02:00] starting iteration=0 job.command="sleep 2" job.position=0 job.schedule="* * * * * * *" +INFO[2017-07-11T12:24:29+02:00] job succeeded iteration=0 job.command="sleep 2" job.position=0 job.schedule="* * * * * * *" +WARN[2017-07-11T12:24:29+02:00] job took too long to run: it should have started 1.009438854s ago job.command="sleep 2" job.position=0 job.schedule="* * * * * * *" +INFO[2017-07-11T12:24:30+02:00] starting iteration=1 job.command="sleep 2" job.position=0 job.schedule="* * * * * * *" +INFO[2017-07-11T12:24:32+02:00] job succeeded iteration=1 job.command="sleep 2" job.position=0 job.schedule="* * * * * * *" +WARN[2017-07-11T12:24:32+02:00] job took too long to run: it should have started 1.014474099s ago job.command="sleep 2" job.position=0 job.schedule="* * * * * * *" +``` + + [cronexpr]: https://github.com/gorhill/cronexpr From 1000379aae9979826a7dfe1e822da0aa1bd7f430 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Tue, 11 Jul 2017 12:42:30 +0200 Subject: [PATCH 20/21] Add Travis --- .gitignore | 1 + .travis.yml | 27 +++++++++++++++++++++++++++ Makefile | 15 +++++++++++++-- integration/test.bats | 4 ++++ 4 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 .travis.yml diff --git a/.gitignore b/.gitignore index 3f1802d..9d0839a 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ supercronic +vendor diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e5d1331 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,27 @@ +sudo: false +dist: trusty + +language: go + +go: "1.8" + +install: + - curl https://glide.sh/get | sh + - make deps + - git clone https://github.com/sstephenson/bats.git --branch v0.4.0 --depth 1 "${HOME}/bats" + - export "PATH=${PATH}:${HOME}/bats/bin" + +jobs: + include: + - stage: test + script: + - make fmt && make test + - stage: build + script: + - mkdir -p dist + - for arch in amd64 386 arm arm64; do GOARCH="$arch" go build && mv supercronic "dist/supercronic-${arch}"; done + - cd dist + - ls -lah * + - file * + - sha1sum * + - sha256sum * diff --git a/Makefile b/Makefile index d5b3045..a074cd2 100644 --- a/Makefile +++ b/Makefile @@ -1,16 +1,27 @@ GOFILES_NOVENDOR = $(shell find . -type f -name '*.go' -not -path "./vendor/*") SHELL=/bin/bash +.PHONY: deps +deps: + glide install + .PHONY: build build: $(GOFILES) go build -.PHONY: test -test: build +.PHONY: unit +unit: go test $$(go list ./... | grep -v /vendor/) go vet $$(go list ./... | grep -v /vendor/) + +.PHONY: integration +integration: build bats integration +.PHONY: test +test: unit integration + true + .PHONY: fmt fmt: gofmt -l -w ${GOFILES_NOVENDOR} diff --git a/integration/test.bats b/integration/test.bats index 5c85b76..cd5710d 100644 --- a/integration/test.bats +++ b/integration/test.bats @@ -5,6 +5,10 @@ function run_supercronic() { "${BATS_TEST_DIRNAME}/../supercronic" ${SUPERCRONIC_ARGS:-} "$crontab" 2>&1 } +@test "it starts" { + run_supercronic "${BATS_TEST_DIRNAME}/noop.crontab" 2s +} + @test "it runs a cron job" { n="$(run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" 5s | grep -iE "hello from crontab.*channel=stdout" | wc -l)" [[ "$n" -gt 3 ]] From e99f294f21b1de8899a6e103086ea3240e142dff Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Tue, 11 Jul 2017 13:34:12 +0200 Subject: [PATCH 21/21] Push to Github releases on tag --- .travis.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.travis.yml b/.travis.yml index e5d1331..1972e55 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,3 +25,11 @@ jobs: - file * - sha1sum * - sha256sum * + deploy: + provider: releases + api_key: + secure: "MPXVtn7XhcWkkDpDCcPqvfp1klsfGflsMrhBTH5WO2dUD352fJdbHTUi87Yuo+aFib+yNArzFn3cl9ZfUmD7vSSjmxqFjrgcIJT45Qqo4Y+T39JMUo7QuCsBUV4QbLlHMAyA3ZrcpWmpyGC5VgqiRN6D/XCTmB355fMfbF/Wov/shADiLLzYxDRkxUggx2nqBrG1Eo0JfS5Ji/MUbA5dhmOoDnf0YTsu2SWhS1nErj0HTe/j2/o7wG9aM1rQg6sU0DDTarPWNVJn6HNx0E65VzvfZH2v6g0rfLQ3sydeHdtvyS2KxBlCcp7ceJ2VHLuurR9IZTqH8GYA8GYAAZpo2oZF2esqH6tIpTIG5kJJy9Ybzw1o5Q3R3LAY18/IvdiUmfrMUy1Bkai1Lz1nHvcG5azwSOgrZ/hTbby1XPS4TdjbC7tyQXJ/u0ch+qxLOcIwKp/3DiE6nmMXJkCv4hf5YX/AYze2TKtm2uhE2qQF7kQ3tKi64nOBX9N3+mJVthS37JA8Zrak3D/5E4vtul87lahczOCNS2qYcET04Td77HJ1HEGgSnJETvnfG4+8LmLYoIqN3201Vsk585CNVpUY7PjYCxFBRadj7SfHmAq8mEQF7rpM8ELmBirkwta1QQq5Qma49ozCxWcnhnqw9NPRG3oNCrYsVC2APIrkvsOjVPo=" + file: "dist/*" + skip_cleanup: true + on: + tags: true