Skip to content

Commit

Permalink
Eager-broadcast leadership on every leader iteration
Browse files Browse the repository at this point in the history
Previously we assumed that if our peers didn't change and we had the
leadership, that the peers would continue to acknowledge our
leadership.  But this assumes that a peer didn't restart or crash.
Instead, on every leader iteration, check with all the peers that we
are still the leader (using the same GRPC method).  For negligible
additional traffic, this should enable greater recovery from errors
and greater consistency of leader election.

Issue kopeio#348
  • Loading branch information
justinsb committed Jan 18, 2021
1 parent bedafb2 commit 28a2d70
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
19 changes: 14 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ type peerState struct {
}

type leadershipState struct {
token string
acked map[privateapi.PeerId]bool
token string
ackedPeers map[privateapi.PeerId]bool
}

// NewEtcdController is the constructor for an EtcdController
Expand Down Expand Up @@ -250,8 +250,8 @@ func (m *EtcdController) run(ctx context.Context) (bool, error) {
ackedMap[peer] = true
}
m.leadership = &leadershipState{
token: leadershipToken,
acked: ackedMap,
token: leadershipToken,
ackedPeers: ackedMap,
}

// reset our peer state after a leadership transition
Expand All @@ -262,6 +262,15 @@ func (m *EtcdController) run(ctx context.Context) (bool, error) {
return false, nil
}

{
// We always broadcast our leadership claim, to ensure that all peers are in sync
err := m.peers.AssertLeadership(ctx, m.leadership.token)
if err != nil {
return false, fmt.Errorf("error during AssertLeadership: %w", err)
}
// We don't update the ack map, instead we'll resign leadership if we find a new peer
}

// Check that all peers have acked the leader
// Even if we are the leader, we check we have sufficient peers acking us as leader before performing some operations,
// This helps avoid multiple leaders when we're partitioned
Expand All @@ -271,7 +280,7 @@ func (m *EtcdController) run(ctx context.Context) (bool, error) {
ackedPeerCount := 0
{
for _, peer := range peers {
if !m.leadership.acked[peer.Id] {
if !m.leadership.ackedPeers[peer.Id] {
klog.Infof("peer %q has not acked our leadership; resigning leadership", peer)
m.leadership = nil

Expand Down
40 changes: 40 additions & 0 deletions pkg/privateapi/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,46 @@ func (s *Server) BecomeLeader(ctx context.Context) ([]PeerId, string, error) {
return acked, request.View.LeadershipToken, nil
}

func (s *Server) AssertLeadership(ctx context.Context, leadershipToken string) error {
// TODO: Should we send a notification if we ourselves would reject it?
snapshot, infos := s.snapshotHealthy()

request := &LeaderNotificationRequest{}

request.View = &View{}
for _, info := range infos {
request.View.Healthy = append(request.View.Healthy, info)
}
request.View.Leader = &s.myInfo
request.View.LeadershipToken = leadershipToken

for peerID := range snapshot {
conn, err := s.GetPeerClient(peerID)
if err != nil {
return fmt.Errorf("error getting peer client for %s: %v", peerID, err)
}

peerClient := NewClusterServiceClient(conn)

timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
response, err := peerClient.LeaderNotification(timeoutCtx, request)
cancel()
if err != nil {
return fmt.Errorf("error sending leader assertion to %s: %v", peerID, err)
}

if response.View != nil {
s.addPeersFromView(response.View)
}

if !response.Accepted {
return fmt.Errorf("our leadership assertion was not accepted by peer %q: %v", peerId, response)
}
}

return nil
}

func randomToken() string {
b := make([]byte, 16, 16)
_, err := io.ReadFull(crypto_rand.Reader, b)
Expand Down
1 change: 1 addition & 0 deletions pkg/privateapi/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Peers interface {
MyPeerId() PeerId
GetPeerClient(peerId PeerId) (*grpc.ClientConn, error)
BecomeLeader(ctx context.Context) ([]PeerId, string, error)
AssertLeadership(ctx context.Context, leadershipToken string) error
IsLeader(token string) bool
}

Expand Down

0 comments on commit 28a2d70

Please sign in to comment.