Skip to content

Commit

Permalink
Refactor cron lifecycle (#1119)
Browse files Browse the repository at this point in the history
* Clear entries on start
* Do not init the Cron more than once
* Wait for jobs to finish on graceful stop
* Fix test
* Reuse inner cron.Stop() method comment
* ClearCron needs to empty the map and set proper metrics
* Add test for ClearCron
* Remove the usage of EntryJobMap

Avoid to maintain a parallel map for job-ids.
Refactor it to EntryJob struct containing the ID and Job for the entry.

* Remove what's in the Cron anyway
* Remove unnecessary continues
* Add comments on scheduler stop to clarify design decision
  • Loading branch information
Victor Castell authored Jun 2, 2022
1 parent 6d6c2d7 commit 5f4a31e
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 54 deletions.
3 changes: 1 addition & 2 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ func (a *Agent) Stop() error {
a.logger.Info("agent: Called member stop, now stopping")

if a.config.Server && a.sched.Started() {
a.sched.Stop()
a.sched.ClearCron()
<-a.sched.Stop().Done()
}

if a.config.Server {
Expand Down
2 changes: 1 addition & 1 deletion dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJ
jpb := job.ToProto()

// If everything is ok, remove the job
grpcs.agent.sched.RemoveJob(job)
grpcs.agent.sched.RemoveJob(job.Name)
if job.Ephemeral {
grpcs.logger.WithField("job", job.Name).Info("grpc: Done deleting ephemeral job")
}
Expand Down
4 changes: 4 additions & 0 deletions dkron/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (a *Agent) leadershipTransfer() error {
for i := 0; i < retryCount; i++ {
err := a.raft.LeadershipTransfer().Error()
if err == nil {
// Stop the scheduler, running jobs will continue to finish but we
// can not actively wait for them blocking the execution here.
a.sched.Stop()
a.logger.Info("dkron: successfully transferred leadership")
return nil
Expand Down Expand Up @@ -230,6 +232,8 @@ func (a *Agent) establishLeadership(stopCh chan struct{}) error {
// This is used to cleanup any state that may be specific to a leader.
func (a *Agent) revokeLeadership() error {
defer metrics.MeasureSince([]string{"dkron", "leader", "revoke_leadership"}, time.Now())
// Stop the scheduler, running jobs will continue to finish but we
// can not actively wait for them blocking the execution here.
a.sched.Stop()

return nil
Expand Down
4 changes: 2 additions & 2 deletions dkron/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) {

// In case the job is not a child job, compute the next execution time
if job.ParentJob == "" {
if e, ok := a.sched.GetEntry(jobName); ok {
job.Next = e.Next
if ej, ok := a.sched.GetEntryJob(jobName); ok {
job.Next = ej.entry.Next
if err := a.applySetJob(job.ToProto()); err != nil {
return nil, fmt.Errorf("agent: Run error storing job %s before running: %w", jobName, err)
}
Expand Down
99 changes: 55 additions & 44 deletions dkron/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dkron

import (
"context"
"errors"
"expvar"
"strings"
Expand All @@ -20,25 +21,28 @@ var (
ErrScheduleParse = errors.New("can't parse job schedule")
)

type EntryJob struct {
entry *cron.Entry
job *Job
}

// Scheduler represents a dkron scheduler instance, it stores the cron engine
// and the related parameters.
type Scheduler struct {
// mu is to prevent concurrent edits to Cron and Started
mu sync.RWMutex
Cron *cron.Cron
started bool
EntryJobMap sync.Map
logger *logrus.Entry
mu sync.RWMutex
Cron *cron.Cron
started bool
logger *logrus.Entry
}

// NewScheduler creates a new Scheduler instance
func NewScheduler(logger *logrus.Entry) *Scheduler {
schedulerStarted.Set(0)
return &Scheduler{
Cron: nil,
started: false,
EntryJobMap: sync.Map{},
logger: logger,
Cron: cron.New(cron.WithParser(extcron.NewParser())),
started: false,
logger: logger,
}
}

Expand All @@ -48,14 +52,10 @@ func (s *Scheduler) Start(jobs []*Job, agent *Agent) error {
s.mu.Lock()
defer s.mu.Unlock()

if s.Cron != nil {
// Stop the cron scheduler first and wait for all jobs to finish
s.Stop()
// Clear the cron scheduler
s.Cron = nil
if s.started {
return errors.New("scheduler: cron already started, should be stopped first")
}

s.Cron = cron.New(cron.WithParser(extcron.NewParser()))
s.ClearCron()

metrics.IncrCounter([]string{"scheduler", "start"}, 1)
for _, job := range jobs {
Expand All @@ -71,40 +71,47 @@ func (s *Scheduler) Start(jobs []*Job, agent *Agent) error {
return nil
}

// Stop stops the scheduler effectively not running any job.
func (s *Scheduler) Stop() {
// Stop stops the cron scheduler if it is running; otherwise it does nothing.
// A context is returned so the caller can wait for running jobs to complete.
func (s *Scheduler) Stop() context.Context {
s.mu.Lock()
defer s.mu.Unlock()

ctx := s.Cron.Stop()
if s.started {
s.logger.Debug("scheduler: Stopping scheduler")
<-s.Cron.Stop().Done()
s.started = false
// Keep Cron exists and let the jobs which have been scheduled can continue to finish,
// even the node's leadership will be revoked.
// Ignore the running jobs and make s.Cron to nil may cause whole process crashed.
//s.Cron = nil

// expvars
cronInspect.Do(func(kv expvar.KeyValue) {
kv.Value = nil
})
}
schedulerStarted.Set(0)
return ctx
}

// Restart the scheduler
func (s *Scheduler) Restart(jobs []*Job, agent *Agent) {
// Stop the scheduler, running jobs will continue to finish but we
// can not actively wait for them blocking the execution here.
s.Stop()
s.ClearCron()

if err := s.Start(jobs, agent); err != nil {
s.logger.Fatal(err)
}
}

// Clear cron separately, this can only be called when agent will be stop.
// ClearCron clears the cron scheduler
func (s *Scheduler) ClearCron() {
s.Cron = nil
for _, e := range s.Cron.Entries() {
if j, ok := e.Job.(*Job); !ok {
s.logger.Errorf("scheduler: Failed to cast job to *Job found type %T and removing it", e.Job)
s.Cron.Remove(e.ID)
} else {
s.RemoveJob(j.Name)
}
}
}

// Started will safely return if the scheduler is started or not
Expand All @@ -115,24 +122,30 @@ func (s *Scheduler) Started() bool {
return s.started
}

// GetEntry returns a scheduler entry from a snapshot in
// GetEntryJob returns a EntryJob object from a snapshot in
// the current time, and whether or not the entry was found.
func (s *Scheduler) GetEntry(jobName string) (cron.Entry, bool) {
func (s *Scheduler) GetEntryJob(jobName string) (EntryJob, bool) {
for _, e := range s.Cron.Entries() {
j, _ := e.Job.(*Job)
j.logger = s.logger
if j.Name == jobName {
return e, true
if j, ok := e.Job.(*Job); !ok {
s.logger.Errorf("scheduler: Failed to cast job to *Job found type %T", e.Job)
} else {
j.logger = s.logger
if j.Name == jobName {
return EntryJob{
entry: &e,
job: j,
}, true
}
}
}
return cron.Entry{}, false
return EntryJob{}, false
}

// AddJob Adds a job to the cron scheduler
func (s *Scheduler) AddJob(job *Job) error {
// Check if the job is already set and remove it if exists
if _, ok := s.EntryJobMap.Load(job.Name); ok {
s.RemoveJob(job)
if _, ok := s.GetEntryJob(job.Name); ok {
s.RemoveJob(job.Name)
}

if job.Disabled || job.ParentJob != "" {
Expand All @@ -154,11 +167,10 @@ func (s *Scheduler) AddJob(job *Job) error {
schedule = "CRON_TZ=" + job.Timezone + " " + schedule
}

id, err := s.Cron.AddJob(schedule, job)
_, err := s.Cron.AddJob(schedule, job)
if err != nil {
return err
}
s.EntryJobMap.Store(job.Name, id)

cronInspect.Set(job.Name, job)
metrics.IncrCounterWithLabels([]string{"scheduler", "job_add"}, 1, []metrics.Label{{Name: "job", Value: job.Name}})
Expand All @@ -167,15 +179,14 @@ func (s *Scheduler) AddJob(job *Job) error {
}

// RemoveJob removes a job from the cron scheduler if it exists.
func (s *Scheduler) RemoveJob(job *Job) {
func (s *Scheduler) RemoveJob(jobName string) {
s.logger.WithFields(logrus.Fields{
"job": job.Name,
"job": jobName,
}).Debug("scheduler: Removing job from cron")
if v, ok := s.EntryJobMap.Load(job.Name); ok {
s.Cron.Remove(v.(cron.EntryID))
s.EntryJobMap.Delete(job.Name)

cronInspect.Delete(job.Name)
metrics.IncrCounterWithLabels([]string{"scheduler", "job_delete"}, 1, []metrics.Label{{Name: "job", Value: job.Name}})
if ej, ok := s.GetEntryJob(jobName); ok {
s.Cron.Remove(ej.entry.ID)
cronInspect.Delete(jobName)
metrics.IncrCounterWithLabels([]string{"scheduler", "job_delete"}, 1, []metrics.Label{{Name: "job", Value: jobName}})
}
}
61 changes: 56 additions & 5 deletions dkron/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package dkron

import (
"fmt"
"testing"
"time"

"github.com/distribworks/dkron/v3/extcron"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
)

Expand All @@ -28,8 +31,8 @@ func TestSchedule(t *testing.T) {
assert.True(t, sched.Started())
now := time.Now().Truncate(time.Second)

entry, _ := sched.GetEntry(testJob1.Name)
assert.Equal(t, now.Add(time.Second*2), entry.Next)
ej, _ := sched.GetEntryJob(testJob1.Name)
assert.Equal(t, now.Add(time.Second*2), ej.entry.Next)

testJob2 := &Job{
Name: "cron_job",
Expand All @@ -43,12 +46,27 @@ func TestSchedule(t *testing.T) {

assert.True(t, sched.started)
assert.True(t, sched.Started())

sched.Stop()
}

func TestClearCron(t *testing.T) {
log := getTestLogger()
sched := NewScheduler(log)

testJob := &Job{
Name: "cron_job",
Schedule: "@every 2s",
Executor: "shell",
ExecutorConfig: map[string]string{"command": "echo 'test1'", "shell": "true"},
Owner: "John Dough",
OwnerEmail: "[email protected]",
}
sched.AddJob(testJob)
assert.Len(t, sched.Cron.Entries(), 1)

sched.Cron.Remove(1)
sched.ClearCron()
assert.Len(t, sched.Cron.Entries(), 0)

sched.Stop()
}

func TestTimezoneAwareJob(t *testing.T) {
Expand All @@ -69,3 +87,36 @@ func TestTimezoneAwareJob(t *testing.T) {
assert.Len(t, sched.Cron.Entries(), 1)
sched.Stop()
}

func TestScheduleStop(t *testing.T) {
log := getTestLogger()
sched := NewScheduler(log)

sched.Cron = cron.New(cron.WithParser(extcron.NewParser()))
sched.Cron.AddFunc("@every 2s", func() {
time.Sleep(time.Second * 5)
fmt.Println("function done")
})
sched.Cron.Start()
sched.started = true

testJob1 := &Job{
Name: "cron_job",
Schedule: "@every 2s",
Executor: "shell",
ExecutorConfig: map[string]string{"command": "echo 'test1'", "shell": "true"},
Owner: "John Dough",
OwnerEmail: "[email protected]",
}
err := sched.Start([]*Job{testJob1}, &Agent{})
assert.Error(t, err)

// Wait for the job to start
time.Sleep(time.Second * 2)
<-sched.Stop().Done()
err = sched.Start([]*Job{testJob1}, &Agent{})
assert.NoError(t, err)

sched.Stop()
assert.False(t, sched.Started())
}

0 comments on commit 5f4a31e

Please sign in to comment.