Skip to content

Commit

Permalink
NRG: Revert implementation from #5987
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Nov 22, 2024
1 parent e6bbdbd commit c1cbbb5
Showing 1 changed file with 13 additions and 19 deletions.
32 changes: 13 additions & 19 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3082,10 +3082,10 @@ func (n *raft) catchupStalled() bool {
if n.catchup == nil {
return false
}
if n.catchup.pindex == n.commit {
if n.catchup.pindex == n.pindex {
return time.Since(n.catchup.active) > 2*time.Second
}
n.catchup.pindex = n.commit
n.catchup.pindex = n.pindex
n.catchup.active = time.Now()
return false
}
Expand All @@ -3104,7 +3104,7 @@ func (n *raft) createCatchup(ae *appendEntry) string {
cterm: ae.pterm,
cindex: ae.pindex,
pterm: n.pterm,
pindex: n.commit,
pindex: n.pindex,
active: time.Now(),
}
inbox := n.newCatchupInbox()
Expand Down Expand Up @@ -3274,7 +3274,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if n.catchupStalled() {
n.debug("Catchup may be stalled, will request again")
inbox = n.createCatchup(ae)
ar = newAppendEntryResponse(n.pterm, n.commit, n.id, false)
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
}
n.Unlock()
if ar != nil {
Expand Down Expand Up @@ -3315,34 +3315,28 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex {
// Check if this is a lower index than what we were expecting.
if ae.pindex < n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse

// An AppendEntry is stored at seq=ae.pindex+1. This can be checked when eae != nil, eae.pindex==ae.pindex.
seq := ae.pindex + 1
var success bool
if eae, _ := n.loadEntry(seq); eae == nil {
if eae, _ := n.loadEntry(ae.pindex); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
success = true
} else {
n.resetWAL()
}
} else if eae.term != ae.term {
} else {
// If terms mismatched, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(eae.pterm, eae.pindex)
} else {
success = true
}
// Cancel regardless if truncated/unsuccessful.
if !success {
n.cancelCatchup()
}
// Cancel regardless.
n.cancelCatchup()

// Create response.
ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success)
Expand Down Expand Up @@ -3416,11 +3410,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return

} else {
n.debug("AppendEntry did not match %d %d with %d %d (commit %d)", ae.pterm, ae.pindex, n.pterm, n.pindex, n.commit)
if ae.pindex > n.commit {
n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex)
if ae.pindex > n.pindex {
// Setup our state for catching up.
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.commit, n.id, false)
ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
n.Unlock()
n.sendRPC(ae.reply, inbox, ar.encode(arbuf))
arPool.Put(ar)
Expand Down

0 comments on commit c1cbbb5

Please sign in to comment.