Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leadership transfer #1109

Merged
merged 3 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.17
FROM golang:1.18
LABEL maintainer="Victor Castell <[email protected]>"

EXPOSE 8080 8946
Expand Down
47 changes: 39 additions & 8 deletions dkron/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,31 @@ func (a *Agent) monitorLeadership() {
}
}

func (a *Agent) leadershipTransfer() error {
retryCount := 3
for i := 0; i < retryCount; i++ {
err := a.raft.LeadershipTransfer().Error()
if err == nil {
a.sched.Stop()
a.logger.Info("dkron: successfully transferred leadership")
return nil
}

// Don't retry if the Raft version doesn't support leadership transfer
// since this will never succeed.
if err == raft.ErrUnsupportedProtocol {
return fmt.Errorf("leadership transfer not supported with Raft version lower than 3")
}

a.logger.Error("failed to transfer leadership attempt, will retry",
"attempt", i,
"retry_limit", retryCount,
"error", err,
)
}
return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount)
}

// leaderLoop runs as long as we are the leader to run various
// maintenance activities
func (a *Agent) leaderLoop(stopCh chan struct{}) {
Expand All @@ -85,16 +110,23 @@ RECONCILE:
// Check if we need to handle initial leadership actions
if !establishedLeader {
if err := a.establishLeadership(stopCh); err != nil {
a.logger.WithError(err).Error("dkron: failed to establish leadership")

// Immediately revoke leadership since we didn't successfully
// establish leadership.
if err := a.revokeLeadership(); err != nil {
a.logger.WithError(err).Error("dkron: failed to revoke leadership")
}

a.logger.WithError(err).Error("dkron: failed to establish leadership")

// TODO: review this code path
goto WAIT
// Attempt to transfer leadership. If successful, leave the
// leaderLoop since this node is no longer the leader. Otherwise
// try to establish leadership again after 5 seconds.
if err := a.leadershipTransfer(); err != nil {
a.logger.Error("failed to transfer leadership", "error", err)
interval = time.After(5 * time.Second)
goto WAIT
}
return
}

establishedLeader = true
Expand Down Expand Up @@ -125,10 +157,12 @@ RECONCILE:
}

WAIT:
// Wait until leadership is lost
// Wait until leadership is lost or periodically reconcile as long as we
// are the leader, or when Serf events arrive.
for {
select {
case <-stopCh:
// Lost leadership.
return
case <-a.shutdownCh:
return
Expand All @@ -145,9 +179,6 @@ WAIT:
func (a *Agent) reconcile() error {
defer metrics.MeasureSince([]string{"dkron", "leader", "reconcile"}, time.Now())

// TODO: Try to fix https://github.com/distribworks/dkron/issues/998
a.sched.Cron.Start()

members := a.serf.Members()
for _, member := range members {
if err := a.reconcileMember(member); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions dkron/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dkron
import (
"errors"
"expvar"
"fmt"
"strings"
"sync"

Expand Down Expand Up @@ -50,8 +49,10 @@ func (s *Scheduler) Start(jobs []*Job, agent *Agent) error {
defer s.mu.Unlock()

if s.Cron != nil {
// Creating a new cron is risky if not nil because the previous invocation is dirty
return fmt.Errorf("cron is already configured, can not start scheduler")
// Stop the cron scheduler first and wait for all jobs to finish
s.Stop()
// Clear the cron scheduler
s.Cron = nil
}

s.Cron = cron.New(cron.WithParser(extcron.NewParser()))
Expand Down