From dc93b00158d0905e841b050377e2afea8cde41e0 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Sun, 29 May 2022 23:54:52 +0200 Subject: [PATCH 01/11] Refactor cron lifecycle * Clear entries on start * Do not init the Cron more than once * Wait for jobs to finish on graceful stop --- dkron/agent.go | 3 +-- dkron/scheduler.go | 27 +++++++++++---------------- dkron/scheduler_test.go | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index d8aea86ab..89bfa13b4 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -245,8 +245,7 @@ func (a *Agent) Stop() error { a.logger.Info("agent: Called member stop, now stopping") if a.config.Server && a.sched.Started() { - a.sched.Stop() - a.sched.ClearCron() + <-a.sched.Stop().Done() } if a.config.Server { diff --git a/dkron/scheduler.go b/dkron/scheduler.go index e2c581710..8e415c61a 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -1,6 +1,7 @@ package dkron import ( + "context" "errors" "expvar" "strings" @@ -35,7 +36,7 @@ type Scheduler struct { func NewScheduler(logger *logrus.Entry) *Scheduler { schedulerStarted.Set(0) return &Scheduler{ - Cron: nil, + Cron: cron.New(cron.WithParser(extcron.NewParser())), started: false, EntryJobMap: sync.Map{}, logger: logger, @@ -48,14 +49,10 @@ func (s *Scheduler) Start(jobs []*Job, agent *Agent) error { s.mu.Lock() defer s.mu.Unlock() - if s.Cron != nil { - // Stop the cron scheduler first and wait for all jobs to finish - s.Stop() - // Clear the cron scheduler - s.Cron = nil + if s.started { + return errors.New("scheduler: cron already started, should be stopped first") } - - s.Cron = cron.New(cron.WithParser(extcron.NewParser())) + s.ClearCron() metrics.IncrCounter([]string{"scheduler", "start"}, 1) for _, job := range jobs { @@ -72,18 +69,14 @@ func (s *Scheduler) Start(jobs []*Job, agent *Agent) error { } // Stop stops the scheduler effectively not running any job. -func (s *Scheduler) Stop() { +func (s *Scheduler) Stop() context.Context { s.mu.Lock() defer s.mu.Unlock() + ctx := s.Cron.Stop() if s.started { s.logger.Debug("scheduler: Stopping scheduler") - <-s.Cron.Stop().Done() s.started = false - // Keep Cron exists and let the jobs which have been scheduled can continue to finish, - // even the node's leadership will be revoked. - // Ignore the running jobs and make s.Cron to nil may cause whole process crashed. - //s.Cron = nil // expvars cronInspect.Do(func(kv expvar.KeyValue) { @@ -91,12 +84,12 @@ func (s *Scheduler) Stop() { }) } schedulerStarted.Set(0) + return ctx } // Restart the scheduler func (s *Scheduler) Restart(jobs []*Job, agent *Agent) { s.Stop() - s.ClearCron() if err := s.Start(jobs, agent); err != nil { s.logger.Fatal(err) } @@ -104,7 +97,9 @@ func (s *Scheduler) Restart(jobs []*Job, agent *Agent) { // Clear cron separately, this can only be called when agent will be stop. func (s *Scheduler) ClearCron() { - s.Cron = nil + for _, e := range s.Cron.Entries() { + s.Cron.Remove(e.ID) + } } // Started will safely return if the scheduler is started or not diff --git a/dkron/scheduler_test.go b/dkron/scheduler_test.go index 3e9b74396..44157e4f1 100644 --- a/dkron/scheduler_test.go +++ b/dkron/scheduler_test.go @@ -1,9 +1,12 @@ package dkron import ( + "fmt" "testing" "time" + "github.com/distribworks/dkron/v3/extcron" + "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" ) @@ -69,3 +72,36 @@ func TestTimezoneAwareJob(t *testing.T) { assert.Len(t, sched.Cron.Entries(), 1) sched.Stop() } + +func TestScheduleStop(t *testing.T) { + log := getTestLogger() + sched := NewScheduler(log) + + sched.Cron = cron.New(cron.WithParser(extcron.NewParser())) + sched.Cron.AddFunc("@every 2s", func() { + time.Sleep(time.Second * 5) + fmt.Println("function done") + }) + sched.Cron.Start() + sched.started = true + + testJob1 := &Job{ + Name: "cron_job", + Schedule: "@every 2s", + Executor: "shell", + ExecutorConfig: map[string]string{"command": "echo 'test1'", "shell": "true"}, + Owner: "John Dough", + OwnerEmail: "foo@bar.com", + } + err := sched.Start([]*Job{testJob1}, &Agent{}) + assert.Error(t, err) + + // Wait for the job to start + time.Sleep(time.Second * 2) + <-sched.Stop().Done() + err = sched.Start([]*Job{testJob1}, &Agent{}) + assert.NoError(t, err) + + sched.Stop() + assert.False(t, sched.Started()) +} From 8fea8c15387de161cbd250180387d4c4bd9216a1 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Mon, 30 May 2022 00:06:43 +0200 Subject: [PATCH 02/11] Fix test --- dkron/scheduler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dkron/scheduler_test.go b/dkron/scheduler_test.go index 44157e4f1..74329fc5f 100644 --- a/dkron/scheduler_test.go +++ b/dkron/scheduler_test.go @@ -48,7 +48,7 @@ func TestSchedule(t *testing.T) { assert.True(t, sched.Started()) assert.Len(t, sched.Cron.Entries(), 1) - sched.Cron.Remove(1) + sched.ClearCron() assert.Len(t, sched.Cron.Entries(), 0) sched.Stop() From 0379ce35b75ee8c64503f0c59f6395e2da07c130 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Mon, 30 May 2022 22:56:56 +0200 Subject: [PATCH 03/11] Reuse inner cron.Stop() method comment --- dkron/scheduler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dkron/scheduler.go b/dkron/scheduler.go index 8e415c61a..60df4414d 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -68,7 +68,8 @@ func (s *Scheduler) Start(jobs []*Job, agent *Agent) error { return nil } -// Stop stops the scheduler effectively not running any job. +// Stop stops the cron scheduler if it is running; otherwise it does nothing. +// A context is returned so the caller can wait for running jobs to complete. func (s *Scheduler) Stop() context.Context { s.mu.Lock() defer s.mu.Unlock() From dfa910ee6559dfe6ca959a7914c97c8fc906f2e4 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Mon, 30 May 2022 23:25:15 +0200 Subject: [PATCH 04/11] ClearCron needs to empty the map and set proper metrics --- dkron/scheduler.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dkron/scheduler.go b/dkron/scheduler.go index 60df4414d..22f4e838b 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -98,9 +98,10 @@ func (s *Scheduler) Restart(jobs []*Job, agent *Agent) { // Clear cron separately, this can only be called when agent will be stop. func (s *Scheduler) ClearCron() { - for _, e := range s.Cron.Entries() { - s.Cron.Remove(e.ID) - } + s.EntryJobMap.Range(func(key interface{}, value interface{}) bool { + s.RemoveJob(&Job{Name: key.(string)}) + return true + }) } // Started will safely return if the scheduler is started or not From 7f9fbe847b7169e12c42846331f9f7a703f98469 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Mon, 30 May 2022 23:40:13 +0200 Subject: [PATCH 05/11] Add test for ClearCron --- dkron/scheduler_test.go | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/dkron/scheduler_test.go b/dkron/scheduler_test.go index 74329fc5f..2a763a578 100644 --- a/dkron/scheduler_test.go +++ b/dkron/scheduler_test.go @@ -46,12 +46,40 @@ func TestSchedule(t *testing.T) { assert.True(t, sched.started) assert.True(t, sched.Started()) + + sched.Stop() +} + +func TestClearCron(t *testing.T) { + log := getTestLogger() + sched := NewScheduler(log) + + testJob := &Job{ + Name: "cron_job", + Schedule: "@every 2s", + Executor: "shell", + ExecutorConfig: map[string]string{"command": "echo 'test1'", "shell": "true"}, + Owner: "John Dough", + OwnerEmail: "foo@bar.com", + } + sched.AddJob(testJob) + assert.Len(t, sched.Cron.Entries(), 1) + var count int + sched.EntryJobMap.Range(func(key, value interface{}) bool { + count++ + return true + }) + assert.Equal(t, 1, count) sched.ClearCron() + count = 0 + sched.EntryJobMap.Range(func(key, value interface{}) bool { + count++ + return true + }) + assert.Equal(t, 0, count) assert.Len(t, sched.Cron.Entries(), 0) - - sched.Stop() } func TestTimezoneAwareJob(t *testing.T) { From b2c68560e129a09e08aec0f6bc89e08b59cc82dd Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Tue, 31 May 2022 12:10:18 +0200 Subject: [PATCH 06/11] Remove the usage of EntryJobMap Avoid to maintain a parallel map for job-ids. Refactor it to EntryJob struct containing the ID and Job for the entry. --- dkron/grpc.go | 2 +- dkron/run.go | 4 +-- dkron/scheduler.go | 76 ++++++++++++++++++++++++----------------- dkron/scheduler_test.go | 17 ++------- 4 files changed, 49 insertions(+), 50 deletions(-) diff --git a/dkron/grpc.go b/dkron/grpc.go index ce8b55d9e..276844361 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -122,7 +122,7 @@ func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJ jpb := job.ToProto() // If everything is ok, remove the job - grpcs.agent.sched.RemoveJob(job) + grpcs.agent.sched.RemoveJob(job.Name) if job.Ephemeral { grpcs.logger.WithField("job", job.Name).Info("grpc: Done deleting ephemeral job") } diff --git a/dkron/run.go b/dkron/run.go index 9e2f44121..4703b3a63 100644 --- a/dkron/run.go +++ b/dkron/run.go @@ -16,8 +16,8 @@ func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { // In case the job is not a child job, compute the next execution time if job.ParentJob == "" { - if e, ok := a.sched.GetEntry(jobName); ok { - job.Next = e.Next + if ej, ok := a.sched.GetEntryJob(jobName); ok { + job.Next = ej.entry.Next if err := a.applySetJob(job.ToProto()); err != nil { return nil, fmt.Errorf("agent: Run error storing job %s before running: %w", jobName, err) } diff --git a/dkron/scheduler.go b/dkron/scheduler.go index 22f4e838b..d0a7f5ed2 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -21,25 +21,28 @@ var ( ErrScheduleParse = errors.New("can't parse job schedule") ) +type EntryJob struct { + entry *cron.Entry + job *Job +} + // Scheduler represents a dkron scheduler instance, it stores the cron engine // and the related parameters. type Scheduler struct { // mu is to prevent concurrent edits to Cron and Started - mu sync.RWMutex - Cron *cron.Cron - started bool - EntryJobMap sync.Map - logger *logrus.Entry + mu sync.RWMutex + Cron *cron.Cron + started bool + logger *logrus.Entry } // NewScheduler creates a new Scheduler instance func NewScheduler(logger *logrus.Entry) *Scheduler { schedulerStarted.Set(0) return &Scheduler{ - Cron: cron.New(cron.WithParser(extcron.NewParser())), - started: false, - EntryJobMap: sync.Map{}, - logger: logger, + Cron: cron.New(cron.WithParser(extcron.NewParser())), + started: false, + logger: logger, } } @@ -96,12 +99,16 @@ func (s *Scheduler) Restart(jobs []*Job, agent *Agent) { } } -// Clear cron separately, this can only be called when agent will be stop. +// ClearCron clears the cron scheduler func (s *Scheduler) ClearCron() { - s.EntryJobMap.Range(func(key interface{}, value interface{}) bool { - s.RemoveJob(&Job{Name: key.(string)}) - return true - }) + for _, e := range s.Cron.Entries() { + if j, ok := e.Job.(*Job); !ok { + s.logger.Errorf("scheduler: Failed to cast job to *Job found type %T", e.Job) + continue + } else { + s.RemoveJob(j.Name) + } + } } // Started will safely return if the scheduler is started or not @@ -112,24 +119,31 @@ func (s *Scheduler) Started() bool { return s.started } -// GetEntry returns a scheduler entry from a snapshot in +// GetEntryJob returns a EntryJob object from a snapshot in // the current time, and whether or not the entry was found. -func (s *Scheduler) GetEntry(jobName string) (cron.Entry, bool) { +func (s *Scheduler) GetEntryJob(jobName string) (EntryJob, bool) { for _, e := range s.Cron.Entries() { - j, _ := e.Job.(*Job) - j.logger = s.logger - if j.Name == jobName { - return e, true + if j, ok := e.Job.(*Job); !ok { + s.logger.Errorf("scheduler: Failed to cast job to *Job found type %T", e.Job) + continue + } else { + j.logger = s.logger + if j.Name == jobName { + return EntryJob{ + entry: &e, + job: j, + }, true + } } } - return cron.Entry{}, false + return EntryJob{}, false } // AddJob Adds a job to the cron scheduler func (s *Scheduler) AddJob(job *Job) error { // Check if the job is already set and remove it if exists - if _, ok := s.EntryJobMap.Load(job.Name); ok { - s.RemoveJob(job) + if _, ok := s.GetEntryJob(job.Name); ok { + s.RemoveJob(job.Name) } if job.Disabled || job.ParentJob != "" { @@ -151,11 +165,10 @@ func (s *Scheduler) AddJob(job *Job) error { schedule = "CRON_TZ=" + job.Timezone + " " + schedule } - id, err := s.Cron.AddJob(schedule, job) + _, err := s.Cron.AddJob(schedule, job) if err != nil { return err } - s.EntryJobMap.Store(job.Name, id) cronInspect.Set(job.Name, job) metrics.IncrCounterWithLabels([]string{"scheduler", "job_add"}, 1, []metrics.Label{{Name: "job", Value: job.Name}}) @@ -164,15 +177,14 @@ func (s *Scheduler) AddJob(job *Job) error { } // RemoveJob removes a job from the cron scheduler if it exists. -func (s *Scheduler) RemoveJob(job *Job) { +func (s *Scheduler) RemoveJob(jobName string) { s.logger.WithFields(logrus.Fields{ - "job": job.Name, + "job": jobName, }).Debug("scheduler: Removing job from cron") - if v, ok := s.EntryJobMap.Load(job.Name); ok { - s.Cron.Remove(v.(cron.EntryID)) - s.EntryJobMap.Delete(job.Name) - cronInspect.Delete(job.Name) - metrics.IncrCounterWithLabels([]string{"scheduler", "job_delete"}, 1, []metrics.Label{{Name: "job", Value: job.Name}}) + if ej, ok := s.GetEntryJob(jobName); ok { + s.Cron.Remove(ej.entry.ID) + cronInspect.Delete(jobName) + metrics.IncrCounterWithLabels([]string{"scheduler", "job_delete"}, 1, []metrics.Label{{Name: "job", Value: jobName}}) } } diff --git a/dkron/scheduler_test.go b/dkron/scheduler_test.go index 2a763a578..4010cc4cd 100644 --- a/dkron/scheduler_test.go +++ b/dkron/scheduler_test.go @@ -31,8 +31,8 @@ func TestSchedule(t *testing.T) { assert.True(t, sched.Started()) now := time.Now().Truncate(time.Second) - entry, _ := sched.GetEntry(testJob1.Name) - assert.Equal(t, now.Add(time.Second*2), entry.Next) + ej, _ := sched.GetEntryJob(testJob1.Name) + assert.Equal(t, now.Add(time.Second*2), ej.entry.Next) testJob2 := &Job{ Name: "cron_job", @@ -63,22 +63,9 @@ func TestClearCron(t *testing.T) { OwnerEmail: "foo@bar.com", } sched.AddJob(testJob) - assert.Len(t, sched.Cron.Entries(), 1) - var count int - sched.EntryJobMap.Range(func(key, value interface{}) bool { - count++ - return true - }) - assert.Equal(t, 1, count) sched.ClearCron() - count = 0 - sched.EntryJobMap.Range(func(key, value interface{}) bool { - count++ - return true - }) - assert.Equal(t, 0, count) assert.Len(t, sched.Cron.Entries(), 0) } From 0779368c4922b7ca3fb130e4e010bf0250408217 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Tue, 31 May 2022 12:13:34 +0200 Subject: [PATCH 07/11] Remove what's in the Cron anyway --- dkron/scheduler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dkron/scheduler.go b/dkron/scheduler.go index d0a7f5ed2..907d52384 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -103,7 +103,8 @@ func (s *Scheduler) Restart(jobs []*Job, agent *Agent) { func (s *Scheduler) ClearCron() { for _, e := range s.Cron.Entries() { if j, ok := e.Job.(*Job); !ok { - s.logger.Errorf("scheduler: Failed to cast job to *Job found type %T", e.Job) + s.logger.Errorf("scheduler: Failed to cast job to *Job found type %T and removing it", e.Job) + s.Cron.Remove(e.ID) continue } else { s.RemoveJob(j.Name) From 05f598eab72341832de4c4bf4329ab3b6b5c6268 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Wed, 1 Jun 2022 10:12:40 +0200 Subject: [PATCH 08/11] Remove unnecessary continues --- dkron/scheduler.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/dkron/scheduler.go b/dkron/scheduler.go index 907d52384..29aab85b2 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -105,7 +105,6 @@ func (s *Scheduler) ClearCron() { if j, ok := e.Job.(*Job); !ok { s.logger.Errorf("scheduler: Failed to cast job to *Job found type %T and removing it", e.Job) s.Cron.Remove(e.ID) - continue } else { s.RemoveJob(j.Name) } @@ -126,7 +125,6 @@ func (s *Scheduler) GetEntryJob(jobName string) (EntryJob, bool) { for _, e := range s.Cron.Entries() { if j, ok := e.Job.(*Job); !ok { s.logger.Errorf("scheduler: Failed to cast job to *Job found type %T", e.Job) - continue } else { j.logger = s.logger if j.Name == jobName { From d976f3248ccbbcb2c94ad7dcfb53c9ce3464a5b7 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Wed, 1 Jun 2022 10:17:49 +0200 Subject: [PATCH 09/11] Add comments on scheduler stop to clarify design decission --- dkron/leader.go | 4 ++++ dkron/scheduler.go | 3 +++ 2 files changed, 7 insertions(+) diff --git a/dkron/leader.go b/dkron/leader.go index 89f2728c0..4ef360b79 100644 --- a/dkron/leader.go +++ b/dkron/leader.go @@ -67,6 +67,8 @@ func (a *Agent) leadershipTransfer() error { for i := 0; i < retryCount; i++ { err := a.raft.LeadershipTransfer().Error() if err == nil { + // Stop the scheduler, running jobs will continue to finish but we + // can not actively wait for them, blocking the executions here. a.sched.Stop() a.logger.Info("dkron: successfully transferred leadership") return nil @@ -230,6 +232,8 @@ func (a *Agent) establishLeadership(stopCh chan struct{}) error { // This is used to cleanup any state that may be specific to a leader. func (a *Agent) revokeLeadership() error { defer metrics.MeasureSince([]string{"dkron", "leader", "revoke_leadership"}, time.Now()) + // Stop the scheduler, running jobs will continue to finish but we + // can not actively wait for them, blocking the executions here. a.sched.Stop() return nil diff --git a/dkron/scheduler.go b/dkron/scheduler.go index 29aab85b2..22949468d 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -93,7 +93,10 @@ func (s *Scheduler) Stop() context.Context { // Restart the scheduler func (s *Scheduler) Restart(jobs []*Job, agent *Agent) { + // Stop the scheduler, running jobs will continue to finish but we + // can not actively wait for them, blocking the executions here. s.Stop() + if err := s.Start(jobs, agent); err != nil { s.logger.Fatal(err) } From 889490f8da0e1fa021305108df79b1790dff3a46 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Wed, 1 Jun 2022 18:02:08 +0200 Subject: [PATCH 10/11] grammar --- dkron/leader.go | 4 +-- dkron/scheduler.go | 2 +- dkron/testing.go | 71 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 dkron/testing.go diff --git a/dkron/leader.go b/dkron/leader.go index 4ef360b79..992e05703 100644 --- a/dkron/leader.go +++ b/dkron/leader.go @@ -68,7 +68,7 @@ func (a *Agent) leadershipTransfer() error { err := a.raft.LeadershipTransfer().Error() if err == nil { // Stop the scheduler, running jobs will continue to finish but we - // can not actively wait for them, blocking the executions here. + // can not actively wait for them blocking the execution here. a.sched.Stop() a.logger.Info("dkron: successfully transferred leadership") return nil @@ -233,7 +233,7 @@ func (a *Agent) establishLeadership(stopCh chan struct{}) error { func (a *Agent) revokeLeadership() error { defer metrics.MeasureSince([]string{"dkron", "leader", "revoke_leadership"}, time.Now()) // Stop the scheduler, running jobs will continue to finish but we - // can not actively wait for them, blocking the executions here. + // can not actively wait for them blocking the execution here. a.sched.Stop() return nil diff --git a/dkron/scheduler.go b/dkron/scheduler.go index 22949468d..bd343108a 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -94,7 +94,7 @@ func (s *Scheduler) Stop() context.Context { // Restart the scheduler func (s *Scheduler) Restart(jobs []*Job, agent *Agent) { // Stop the scheduler, running jobs will continue to finish but we - // can not actively wait for them, blocking the executions here. + // can not actively wait for them blocking the execution here. s.Stop() if err := s.Start(jobs, agent); err != nil { diff --git a/dkron/testing.go b/dkron/testing.go new file mode 100644 index 000000000..2483410ac --- /dev/null +++ b/dkron/testing.go @@ -0,0 +1,71 @@ +package dkron + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/hashicorp/serf/testutil" + "github.com/stretchr/testify/require" +) + +func TestServer(t *testing.T, cb func(*Config)) (*Agent, func()) { + s, c, err := TestServerErr(t, cb) + require.NoError(t, err, "failed to start test server") + return s, c +} + +func TestServerErr(t *testing.T, cb func(*Config)) (*Agent, func(), error) { + dir, err := ioutil.TempDir("", "dkron-test") + require.NoError(t, err) + + aName := "test1" + ip, returnFn := testutil.TakeIP() + aAddr := ip.String() + defer returnFn() + + shutdownCh := make(chan struct{}) + defer close(shutdownCh) + + c := DefaultConfig() + c.BindAddr = aAddr + //c.StartJoin = []string{a2Addr} + c.NodeName = aName + c.Server = true + //c.LogLevel = logLevel + c.BootstrapExpect = 3 + c.DevMode = true + c.DataDir = dir + + agent := NewAgent(c) + if err := agent.Start(); err != nil { + t.Fatal(err) + } + + time.Sleep(2 * time.Second) + + return agent, func() { + ch := make(chan error) + go func() { + defer close(ch) + + // Shutdown server + err := agent.Stop() + if err != nil { + ch <- fmt.Errorf("failed to shutdown server: %w", err) + } + os.RemoveAll(dir) + }() + + select { + case e := <-ch: + if e != nil { + t.Fatal(e.Error()) + } + case <-time.After(1 * time.Minute): + t.Fatal("timed out while shutting down server") + } + }, nil +} From 84f2b45f875bd776997657863f06c697fd48ef7d Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Wed, 1 Jun 2022 18:06:29 +0200 Subject: [PATCH 11/11] Remove file included by mistake --- dkron/testing.go | 71 ------------------------------------------------ 1 file changed, 71 deletions(-) delete mode 100644 dkron/testing.go diff --git a/dkron/testing.go b/dkron/testing.go deleted file mode 100644 index 2483410ac..000000000 --- a/dkron/testing.go +++ /dev/null @@ -1,71 +0,0 @@ -package dkron - -import ( - "fmt" - "io/ioutil" - "os" - "testing" - "time" - - "github.com/hashicorp/serf/testutil" - "github.com/stretchr/testify/require" -) - -func TestServer(t *testing.T, cb func(*Config)) (*Agent, func()) { - s, c, err := TestServerErr(t, cb) - require.NoError(t, err, "failed to start test server") - return s, c -} - -func TestServerErr(t *testing.T, cb func(*Config)) (*Agent, func(), error) { - dir, err := ioutil.TempDir("", "dkron-test") - require.NoError(t, err) - - aName := "test1" - ip, returnFn := testutil.TakeIP() - aAddr := ip.String() - defer returnFn() - - shutdownCh := make(chan struct{}) - defer close(shutdownCh) - - c := DefaultConfig() - c.BindAddr = aAddr - //c.StartJoin = []string{a2Addr} - c.NodeName = aName - c.Server = true - //c.LogLevel = logLevel - c.BootstrapExpect = 3 - c.DevMode = true - c.DataDir = dir - - agent := NewAgent(c) - if err := agent.Start(); err != nil { - t.Fatal(err) - } - - time.Sleep(2 * time.Second) - - return agent, func() { - ch := make(chan error) - go func() { - defer close(ch) - - // Shutdown server - err := agent.Stop() - if err != nil { - ch <- fmt.Errorf("failed to shutdown server: %w", err) - } - os.RemoveAll(dir) - }() - - select { - case e := <-ch: - if e != nil { - t.Fatal(e.Error()) - } - case <-time.After(1 * time.Minute): - t.Fatal("timed out while shutting down server") - } - }, nil -}