diff --git a/server/raft.go b/server/raft.go index c43ec65a178..c12b5e4e339 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3082,10 +3082,10 @@ func (n *raft) catchupStalled() bool { if n.catchup == nil { return false } - if n.catchup.pindex == n.commit { + if n.catchup.pindex == n.pindex { return time.Since(n.catchup.active) > 2*time.Second } - n.catchup.pindex = n.commit + n.catchup.pindex = n.pindex n.catchup.active = time.Now() return false } @@ -3104,7 +3104,7 @@ func (n *raft) createCatchup(ae *appendEntry) string { cterm: ae.pterm, cindex: ae.pindex, pterm: n.pterm, - pindex: n.commit, + pindex: n.pindex, active: time.Now(), } inbox := n.newCatchupInbox() @@ -3274,7 +3274,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if n.catchupStalled() { n.debug("Catchup may be stalled, will request again") inbox = n.createCatchup(ae) - ar = newAppendEntryResponse(n.pterm, n.commit, n.id, false) + ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false) } n.Unlock() if ar != nil { @@ -3315,15 +3315,13 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex { - // Check if this is a lower index than what we were expecting. - if ae.pindex < n.pindex { + // Check if this is a lower or equal index than what we were expecting. + if ae.pindex <= n.pindex { n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) var ar *appendEntryResponse - // An AppendEntry is stored at seq=ae.pindex+1. This can be checked when eae != nil, eae.pindex==ae.pindex. - seq := ae.pindex + 1 var success bool - if eae, _ := n.loadEntry(seq); eae == nil { + if eae, _ := n.loadEntry(ae.pindex); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. if ae.pterm == n.pterm && !catchingUp { @@ -3331,18 +3329,14 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } else { n.resetWAL() } - } else if eae.term != ae.term { + } else { // If terms mismatched, delete that entry and all others past it. // Make sure to cancel any catchups in progress. // Truncate will reset our pterm and pindex. Only do so if we have an entry. n.truncateWAL(eae.pterm, eae.pindex) - } else { - success = true - } - // Cancel regardless if truncated/unsuccessful. - if !success { - n.cancelCatchup() } + // Cancel regardless. + n.cancelCatchup() // Create response. ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success) @@ -3416,11 +3410,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { return } else { - n.debug("AppendEntry did not match %d %d with %d %d (commit %d)", ae.pterm, ae.pindex, n.pterm, n.pindex, n.commit) - if ae.pindex > n.commit { + n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) + if ae.pindex > n.pindex { // Setup our state for catching up. inbox := n.createCatchup(ae) - ar := newAppendEntryResponse(n.pterm, n.commit, n.id, false) + ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) n.Unlock() n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) arPool.Put(ar)