Skip to content

Commit

Permalink
Merge pull request #1537 from aaronlehmann/raft-deadlock
Browse files Browse the repository at this point in the history
raft: Fix possible deadlocks
  • Loading branch information
aaronlehmann authored Sep 14, 2016
2 parents 675f0f4 + 8b34600 commit ad56b20
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 43 deletions.
90 changes: 56 additions & 34 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,39 +396,55 @@ func (n *Node) Run(ctx context.Context) error {
n.confState = rd.Snapshot.Metadata.ConfState
}

// Process committed entries
for _, entry := range rd.CommittedEntries {
if err := n.processCommitted(entry); err != nil {
n.Config.Logger.Error(err)
}
}
// If we cease to be the leader, we must cancel any
// proposals that are currently waiting for a quorum to
// acknowledge them. It is still possible for these to
// become committed, but if that happens we will apply
// them as any follower would.

// Trigger a snapshot every once in awhile
if n.snapshotInProgress == nil &&
raftConfig.SnapshotInterval > 0 &&
n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval {
n.doSnapshot(&raftConfig)
}
// It is important that we cancel these proposals before
// calling processCommitted, so processCommitted does
// not deadlock.

// If we cease to be the leader, we must cancel
// any proposals that are currently waiting for
// a quorum to acknowledge them. It is still
// possible for these to become committed, but
// if that happens we will apply them as any
// follower would.
if rd.SoftState != nil {
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
n.wait.cancelAll()
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Publish(IsFollower)
}

// It is important that we set n.signalledLeadership to 0
// before calling n.wait.cancelAll. When a new raft
// request is registered, it checks n.signalledLeadership
// afterwards, and cancels the registration if it is 0.
// If cancelAll was called first, this call might run
// before the new request registers, but
// signalledLeadership would be set after the check.
// Setting signalledLeadership before calling cancelAll
// ensures that if a new request is registered during
// this transition, it will either be cancelled by
// cancelAll, or by its own check of signalledLeadership.
n.wait.cancelAll()
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
wasLeader = true
}
}

// Process committed entries
for _, entry := range rd.CommittedEntries {
if err := n.processCommitted(entry); err != nil {
n.Config.Logger.Error(err)
}
}

// Trigger a snapshot every once in awhile
if n.snapshotInProgress == nil &&
raftConfig.SnapshotInterval > 0 &&
n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval {
n.doSnapshot(&raftConfig)
}

if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
// If all the entries in the log have become
// committed, broadcast our leadership status.
Expand Down Expand Up @@ -1199,7 +1215,11 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa

r.ID = n.reqIDGen.Next()

ch := n.wait.register(r.ID, cb)
// This must be derived from the context which is cancelled by stop()
// to avoid a deadlock on shutdown.
waitCtx, cancel := context.WithCancel(n.Ctx)

ch := n.wait.register(r.ID, cb, cancel)

// Do this check after calling register to avoid a race.
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
Expand All @@ -1218,24 +1238,19 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
return nil, ErrRequestTooLarge
}

// This must use the context which is cancelled by stop() to avoid a
// deadlock on shutdown.
err = n.Propose(n.Ctx, data)
err = n.Propose(waitCtx, data)
if err != nil {
n.wait.cancel(r.ID)
return nil, err
}

select {
case x, ok := <-ch:
if ok {
res := x.(*applyResult)
return res.resp, res.err
}
return nil, ErrLostLeadership
case <-n.Ctx.Done():
case x := <-ch:
res := x.(*applyResult)
return res.resp, res.err
case <-waitCtx.Done():
n.wait.cancel(r.ID)
return nil, ErrStopped
return nil, ErrLostLeadership
case <-ctx.Done():
n.wait.cancel(r.ID)
return nil, ctx.Err()
Expand All @@ -1247,10 +1262,12 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
// until the change is performed or there is an error.
func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
cc.ID = n.reqIDGen.Next()
ch := n.wait.register(cc.ID, nil)

ctx, cancel := context.WithCancel(ctx)
ch := n.wait.register(cc.ID, nil, cancel)

if err := n.ProposeConfChange(ctx, cc); err != nil {
n.wait.trigger(cc.ID, nil)
n.wait.cancel(cc.ID)
return err
}

Expand All @@ -1264,7 +1281,7 @@ func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
}
return nil
case <-ctx.Done():
n.wait.trigger(cc.ID, nil)
n.wait.cancel(cc.ID)
return ctx.Err()
case <-n.Ctx.Done():
return ErrStopped
Expand Down Expand Up @@ -1307,6 +1324,11 @@ func (n *Node) processEntry(entry raftpb.Entry) error {
// position and cancelling the transaction. Create a new
// transaction to commit the data.

// It should not be possible for processInternalRaftRequest
// to be running in this situation, but out of caution we
// cancel any current invocations to avoid a deadlock.
n.wait.cancelAll()

err := n.memoryStore.ApplyStoreActions(r.Action)
if err != nil {
log.G(context.Background()).Errorf("error applying actions from raft: %v", err)
Expand Down
5 changes: 2 additions & 3 deletions manager/state/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"reflect"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -684,8 +683,8 @@ func TestStress(t *testing.T) {
// update leader
leader = i
break
} else if strings.Contains(err.Error(), "context deadline exceeded") {
// though it's timing out, we still record this value
} else {
// though ProposeValue returned an error, we still record this value,
// for it may be proposed successfully and stored in Raft some time later
pIDs = append(pIDs, id)
}
Expand Down
15 changes: 9 additions & 6 deletions manager/state/raft/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ type waitItem struct {
ch chan interface{}
// callback which is called synchronously when the wait is triggered
cb func()
// callback which is called to cancel a waiter
cancel func()
}

type wait struct {
Expand All @@ -21,13 +23,13 @@ func newWait() *wait {
return &wait{m: make(map[uint64]waitItem)}
}

func (w *wait) register(id uint64, cb func()) <-chan interface{} {
func (w *wait) register(id uint64, cb func(), cancel func()) <-chan interface{} {
w.l.Lock()
defer w.l.Unlock()
_, ok := w.m[id]
if !ok {
ch := make(chan interface{}, 1)
w.m[id] = waitItem{ch: ch, cb: cb}
w.m[id] = waitItem{ch: ch, cb: cb, cancel: cancel}
return ch
}
panic(fmt.Sprintf("duplicate id %x", id))
Expand All @@ -43,7 +45,6 @@ func (w *wait) trigger(id uint64, x interface{}) bool {
waitItem.cb()
}
waitItem.ch <- x
close(waitItem.ch)
return true
}
return false
Expand All @@ -54,8 +55,8 @@ func (w *wait) cancel(id uint64) {
waitItem, ok := w.m[id]
delete(w.m, id)
w.l.Unlock()
if ok {
close(waitItem.ch)
if ok && waitItem.cancel != nil {
waitItem.cancel()
}
}

Expand All @@ -65,6 +66,8 @@ func (w *wait) cancelAll() {

for id, waitItem := range w.m {
delete(w.m, id)
close(waitItem.ch)
if waitItem.cancel != nil {
waitItem.cancel()
}
}
}

0 comments on commit ad56b20

Please sign in to comment.