Skip to content

Commit

Permalink
raft: introduce progress states
Browse files Browse the repository at this point in the history
  • Loading branch information
yichengq committed Mar 18, 2015
1 parent 862c16e commit 67194c0
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 194 deletions.
232 changes: 126 additions & 106 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,96 @@ func (st StateType) String() string {
return stmap[uint64(st)]
}

const (
ProgressStateProbe ProgressStateType = iota
ProgressStateReplicate
ProgressStateSnapshot
)

type ProgressStateType uint64

var prstmap = [...]string{
"ProgressStateProbe",
"ProgressStateReplicate",
"ProgressStateSnapshot",
}

func (st ProgressStateType) String() string { return prstmap[uint64(st)] }

type Progress struct {
Match, Next uint64
Wait int
// If the last sent to the Progress failed and reported
// by the link layer via MsgUnreachable, Unreachable will be set.
// If the Progress is unreachable, snapshot and optimistically append
// will be disabled.
// Unreachable will be unset if raft starts to receive message (msgAppResp,
// msgHeartbeatResp) from the remote peer of the Progress.
Unreachable bool
// When in ProgressStateProbe, leader sends at most one replication message
// per heartbeat interval. It also probes actual progress of the follower.
//
// When in ProgressStateReplicate, leader optimistically increases next
// to the latest entry sent after sending replication message. This is
// an optimized state for fast replicating log entries to the follower.
//
// When in ProgressStateSnapshot, leader should have sent out snapshot
// before and stops sending any replication message.
State ProgressStateType
// Paused is used in ProgressStateProbe.
// When Paused is true, raft should pause sending replication message to this peer.
Paused bool
// PendingSnapshot is used in ProgressStateSnapshot.
// If there is a pending snapshot, the pendingSnapshot will be set to the
// index of the snapshot. If pendingSnapshot is set, the replication process of
// this Progress will be paused. raft will not resend snapshot until the pending one
// is reported to be failed.
//
// PendingSnapshot is set when raft sends out a snapshot to this Progress.
// PendingSnapshot is unset when the snapshot is reported to be successfully,
// or raft updates an equal or higher Match for this Progress.
PendingSnapshot uint64
}

func (pr *Progress) update(n uint64) {
pr.waitReset()
func (pr *Progress) resetState(state ProgressStateType) {
pr.Paused = false
pr.PendingSnapshot = 0
pr.State = state
}

func (pr *Progress) becomeProbe() {
// If the original state is ProgressStateSnapshot, progress knows that
// the pending snapshot has been sent to this peer successfully, then
// probes from pendingSnapshot + 1.
if pr.State == ProgressStateSnapshot {
pendingSnapshot := pr.PendingSnapshot
pr.resetState(ProgressStateProbe)
pr.Next = max(pr.Match+1, pendingSnapshot+1)
} else {
pr.resetState(ProgressStateProbe)
pr.Next = pr.Match + 1
}
}

func (pr *Progress) becomeReplicate() {
pr.resetState(ProgressStateReplicate)
pr.Next = pr.Match + 1
}

func (pr *Progress) becomeSnapshot(snapshoti uint64) {
pr.resetState(ProgressStateSnapshot)
pr.PendingSnapshot = snapshoti
}

// maybeUpdate returns false if the given n index comes from an outdated message.
// Otherwise it updates the progress and returns true.
func (pr *Progress) maybeUpdate(n uint64) bool {
var updated bool
if pr.Match < n {
pr.Match = n
updated = true
pr.resume()
}
if pr.Next < n+1 {
pr.Next = n + 1
}
return updated
}

func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }

// maybeDecrTo returns false if the given to index comes from an out of order message.
// Otherwise it decreases the progress next index to min(rejected, last) and returns true.
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
pr.waitReset()

if pr.Match != 0 {
if pr.State == ProgressStateReplicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.Match {
Expand All @@ -107,61 +157,28 @@ func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
pr.resume()
return true
}

func (pr *Progress) waitDecr(i int) {
pr.Wait -= i
if pr.Wait < 0 {
pr.Wait = 0
}
}
func (pr *Progress) waitSet(w int) { pr.Wait = w }
func (pr *Progress) waitReset() { pr.Wait = 0 }
func (pr *Progress) isUnreachable() bool { return pr.Unreachable }
func (pr *Progress) reachable() { pr.Unreachable = false }

func (pr *Progress) unreachable() {
pr.Unreachable = true
// When in optimistic appending path, if the remote becomes unreachable,
// there is big probability that it loses MsgApp. Fall back to bad
// path to recover it steadily.
if pr.Match != 0 {
pr.Next = pr.Match + 1
}
}

func (pr *Progress) shouldWait() bool { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 }

func (pr *Progress) hasPendingSnapshot() bool { return pr.PendingSnapshot != 0 }
func (pr *Progress) setPendingSnapshot(i uint64) { pr.PendingSnapshot = i }
func (pr *Progress) pause() { pr.Paused = true }
func (pr *Progress) resume() { pr.Paused = false }

// finishSnapshot unsets the pending snapshot and optimistically increase Next to
// the index of pendingSnapshot + 1. The next replication message is expected
// to be msgApp.
func (pr *Progress) snapshotFinish() {
pr.Next = pr.PendingSnapshot + 1
pr.PendingSnapshot = 0
// isPaused returns whether progress stops sending message.
func (pr *Progress) isPaused() bool {
return pr.State == ProgressStateProbe && pr.Paused || pr.State == ProgressStateSnapshot
}

// snapshotFail unsets the pending snapshot. The next replication message is expected
// to be another msgSnap.
func (pr *Progress) snapshotFail() {
pr.PendingSnapshot = 0
}
func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }

// maybeSnapshotAbort unsets pendingSnapshot if Match is equal or higher than
// the pendingSnapshot
func (pr *Progress) maybeSnapshotAbort() bool {
if pr.hasPendingSnapshot() && pr.Match >= pr.PendingSnapshot {
pr.PendingSnapshot = 0
return true
}
return false
return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot
}

func (pr *Progress) String() string {
return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.Next, pr.Match, pr.Wait)
return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d", pr.Next, pr.Match, pr.State, pr.isPaused(), pr.PendingSnapshot)
}

type raft struct {
Expand Down Expand Up @@ -273,18 +290,12 @@ func (r *raft) send(m pb.Message) {
// sendAppend sends RRPC, with entries to the given peer.
func (r *raft) sendAppend(to uint64) {
pr := r.prs[to]
if pr.shouldWait() || pr.hasPendingSnapshot() {
if pr.isPaused() {
return
}
m := pb.Message{}
m.To = to
if r.needSnapshot(pr.Next) {
if pr.isUnreachable() {
// do not try to send snapshot until the Progress is
// reachable
return
}

m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
Expand All @@ -297,20 +308,24 @@ func (r *raft) sendAppend(to uint64) {
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
raftLogger.Infof("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
pr.setPendingSnapshot(sindex)
pr.becomeSnapshot(sindex)
raftLogger.Infof("raft: %x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = r.raftLog.term(pr.Next - 1)
m.Entries = r.raftLog.entries(pr.Next)
m.Commit = r.raftLog.committed
// optimistically increase the next if the follower
// has been matched.
if n := len(m.Entries); pr.Match != 0 && !pr.isUnreachable() && n != 0 {
pr.optimisticUpdate(m.Entries[n-1].Index)
} else if pr.Match == 0 || pr.isUnreachable() {
pr.waitSet(r.heartbeatTimeout)
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
pr.optimisticUpdate(m.Entries[n-1].Index)
case ProgressStateProbe:
pr.pause()
default:
raftLogger.Panicf("raft: %x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m)
Expand Down Expand Up @@ -351,7 +366,7 @@ func (r *raft) bcastHeartbeat() {
continue
}
r.sendHeartbeat(i)
r.prs[i].waitDecr(r.heartbeatTimeout)
r.prs[i].resume()
}
}

Expand Down Expand Up @@ -390,7 +405,7 @@ func (r *raft) appendEntry(es ...pb.Entry) {
es[i].Index = li + 1 + uint64(i)
}
r.raftLog.append(es...)
r.prs[r.id].update(r.raftLog.lastIndex())
r.prs[r.id].maybeUpdate(r.raftLog.lastIndex())
r.maybeCommit()
}

Expand Down Expand Up @@ -547,36 +562,37 @@ func stepLeader(r *raft, m pb.Message) {
r.appendEntry(m.Entries...)
r.bcastAppend()
case pb.MsgAppResp:
if pr.isUnreachable() {
pr.reachable()
raftLogger.Infof("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
}
if m.Reject {
raftLogger.Infof("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
raftLogger.Infof("raft: %x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldWait := pr.shouldWait()
pr.update(m.Index)
if r.prs[m.From].maybeSnapshotAbort() {
raftLogger.Infof("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
if r.maybeCommit() {
r.bcastAppend()
} else if oldWait {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
oldPaused := pr.isPaused()
if pr.maybeUpdate(m.Index) {
switch {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
raftLogger.Infof("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
}

if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
}
}
}
case pb.MsgHeartbeatResp:
if pr.isUnreachable() {
pr.reachable()
raftLogger.Infof("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
Expand All @@ -585,24 +601,28 @@ func stepLeader(r *raft, m pb.Message) {
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
case pb.MsgSnapStatus:
if !pr.hasPendingSnapshot() {
if pr.State != ProgressStateSnapshot {
return
}
if m.Reject {
pr.snapshotFail()
raftLogger.Infof("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
pr.snapshotFinish()
if !m.Reject {
pr.becomeProbe()
raftLogger.Infof("raft: %x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// wait for the msgAppResp from the remote node before sending
// out the next msgApp
pr.waitSet(r.electionTimeout)
} else {
pr.snapshotFailure()
pr.becomeProbe()
raftLogger.Infof("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause()
case pb.MsgUnreachable:
if !pr.isUnreachable() {
pr.unreachable()
raftLogger.Infof("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr)
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
raftLogger.Infof("raft: %x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
}
}

Expand Down
Loading

0 comments on commit 67194c0

Please sign in to comment.