diff --git a/Dockerfile b/Dockerfile index 7a3c6f2b3..ccae6e06a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.17 +FROM golang:1.18 LABEL maintainer="Victor Castell " EXPOSE 8080 8946 diff --git a/dkron/leader.go b/dkron/leader.go index 94fa05352..89f2728c0 100644 --- a/dkron/leader.go +++ b/dkron/leader.go @@ -62,6 +62,31 @@ func (a *Agent) monitorLeadership() { } } +func (a *Agent) leadershipTransfer() error { + retryCount := 3 + for i := 0; i < retryCount; i++ { + err := a.raft.LeadershipTransfer().Error() + if err == nil { + a.sched.Stop() + a.logger.Info("dkron: successfully transferred leadership") + return nil + } + + // Don't retry if the Raft version doesn't support leadership transfer + // since this will never succeed. + if err == raft.ErrUnsupportedProtocol { + return fmt.Errorf("leadership transfer not supported with Raft version lower than 3") + } + + a.logger.Error("failed to transfer leadership attempt, will retry", + "attempt", i, + "retry_limit", retryCount, + "error", err, + ) + } + return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount) +} + // leaderLoop runs as long as we are the leader to run various // maintenance activities func (a *Agent) leaderLoop(stopCh chan struct{}) { @@ -85,16 +110,23 @@ RECONCILE: // Check if we need to handle initial leadership actions if !establishedLeader { if err := a.establishLeadership(stopCh); err != nil { + a.logger.WithError(err).Error("dkron: failed to establish leadership") + // Immediately revoke leadership since we didn't successfully // establish leadership. if err := a.revokeLeadership(); err != nil { a.logger.WithError(err).Error("dkron: failed to revoke leadership") } - a.logger.WithError(err).Error("dkron: failed to establish leadership") - - // TODO: review this code path - goto WAIT + // Attempt to transfer leadership. If successful, leave the + // leaderLoop since this node is no longer the leader. Otherwise + // try to establish leadership again after 5 seconds. + if err := a.leadershipTransfer(); err != nil { + a.logger.Error("failed to transfer leadership", "error", err) + interval = time.After(5 * time.Second) + goto WAIT + } + return } establishedLeader = true @@ -125,10 +157,12 @@ RECONCILE: } WAIT: - // Wait until leadership is lost + // Wait until leadership is lost or periodically reconcile as long as we + // are the leader, or when Serf events arrive. for { select { case <-stopCh: + // Lost leadership. return case <-a.shutdownCh: return @@ -145,9 +179,6 @@ WAIT: func (a *Agent) reconcile() error { defer metrics.MeasureSince([]string{"dkron", "leader", "reconcile"}, time.Now()) - // TODO: Try to fix https://github.com/distribworks/dkron/issues/998 - a.sched.Cron.Start() - members := a.serf.Members() for _, member := range members { if err := a.reconcileMember(member); err != nil { diff --git a/dkron/scheduler.go b/dkron/scheduler.go index ab19efea8..e2c581710 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -3,7 +3,6 @@ package dkron import ( "errors" "expvar" - "fmt" "strings" "sync" @@ -50,8 +49,10 @@ func (s *Scheduler) Start(jobs []*Job, agent *Agent) error { defer s.mu.Unlock() if s.Cron != nil { - // Creating a new cron is risky if not nil because the previous invocation is dirty - return fmt.Errorf("cron is already configured, can not start scheduler") + // Stop the cron scheduler first and wait for all jobs to finish + s.Stop() + // Clear the cron scheduler + s.Cron = nil } s.Cron = cron.New(cron.WithParser(extcron.NewParser()))