From 30257783211def46a72bbb960f905983a520d130 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 15 Oct 2024 16:02:11 +0200 Subject: [PATCH] NRG (2.11): Start catchup from `n.commit` & fix AppendEntry is stored at `seq=ae.pindex+1` (#5987) This PR makes three complementary fixes to the way how catchup and truncating is handled. Specifically: - when doing `n.loadEntry(index)` we need to pass where the AppendEntry is in terms of stream sequence, this is equal to `ae.pindex+1` since the `ae.pindex` is the value before it's stored in the stream. - start catchup from `n.commit`, we could have messages past our commit that have been invalidated and need to be truncated since there was a switch between leaders - because we catchup from `n.commit`, we check if our local AppendEntry matches terms with the incoming AppendEntry, we only need to truncate if the terms don't match Signed-off-by: Maurice van Veen --------- Signed-off-by: Maurice van Veen --- server/jetstream_cluster_4_test.go | 222 ++++++++++++----------- server/raft.go | 34 ++-- server/raft_test.go | 275 ++++++++++++++++++++++------- 3 files changed, 349 insertions(+), 182 deletions(-) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 3482ec30228..0021b78ce61 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -18,7 +18,6 @@ package server import ( "context" - "encoding/binary" "encoding/json" "errors" "fmt" @@ -3729,9 +3728,7 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) { for _, n := range server.raftNodes { rn := n.(*raft) if rn.accName == "$G" { - rn.Lock() rn.updateLeader(noLeader) - rn.Unlock() } } @@ -3996,129 +3993,138 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T } } -func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { +func TestJetStreamClusterDesyncAfterPublishToLeaderWithoutQuorum(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() - nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ + si, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"foo"}, Replicas: 3, }) require_NoError(t, err) - - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ - Durable: "CONSUMER", - Replicas: 3, - AckPolicy: nats.AckExplicitPolicy, + streamLeader := si.Cluster.Leader + streamLeaderServer := c.serverByName(streamLeader) + nc.Close() + nc, js = jsClientConnect(t, streamLeaderServer) + defer nc.Close() + servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool { + return s == streamLeader }) - require_NoError(t, err) - - // Add a message and let the consumer ack it, this moves the consumer's RAFT applied up to 1. - _, err = js.Publish("foo", nil) - require_NoError(t, err) - sub, err := js.PullSubscribe("foo", "CONSUMER") - require_NoError(t, err) - msgs, err := sub.Fetch(1) - require_NoError(t, err) - require_Len(t, len(msgs), 1) - err = msgs[0].AckSync() - require_NoError(t, err) - - // We don't need the client anymore. + // Stop followers so further publishes will not have quorum. + followerName1 := servers[0] + followerName2 := servers[1] + followerServer1 := c.serverByName(followerName1) + followerServer2 := c.serverByName(followerName2) + followerServer1.Shutdown() + followerServer2.Shutdown() + followerServer1.WaitForShutdown() + followerServer2.WaitForShutdown() + // Although this request will time out, it will be added to the stream leader's WAL. + _, err = js.Publish("foo", []byte("first"), nats.AckWait(time.Second)) + require_NotNil(t, err) + require_Equal(t, err, nats.ErrTimeout) + // Now shut down the leader as well. nc.Close() - - lookupConsumer := func(s *Server) *consumer { - t.Helper() - mset, err := s.lookupAccount(globalAccountName) + streamLeaderServer.Shutdown() + streamLeaderServer.WaitForShutdown() + // Only restart the (previous) followers. + followerServer1 = c.restartServer(followerServer1) + c.restartServer(followerServer2) + c.waitOnStreamLeader(globalAccountName, "TEST") + nc, js = jsClientConnect(t, followerServer1) + defer nc.Close() + // Publishing a message will now have quorum. + pubAck, err := js.Publish("foo", []byte("first, this is a retry")) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + // Bring up the previous stream leader. + c.restartServer(streamLeaderServer) + c.waitOnAllCurrent() + c.waitOnStreamLeader(globalAccountName, "TEST") + // Check all servers ended up with the last published message, which had quorum. + for _, s := range c.servers { + c.waitOnStreamCurrent(s, globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) require_NoError(t, err) - acc, err := mset.lookupStream("TEST") + mset, err := acc.lookupStream("TEST") require_NoError(t, err) - o := acc.lookupConsumer("CONSUMER") - require_NotNil(t, o) - return o + state := mset.state() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.Bytes, 55) } - - // Grab current consumer leader before moving all into observer mode. - cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER") +} +func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>"}, + Replicas: 3, + }) + nc.Close() + require_NoError(t, err) + // Pick one server that will only store a part of the messages in its WAL. + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + ts := time.Now().UnixNano() + // Manually add 3 append entries to each node's WAL, except for one node who is one behind. + var scratch [1024]byte for _, s := range c.servers { - // Put all consumer's RAFT into observer mode, this will prevent all servers from trying to become leader. - o := lookupConsumer(s) - o.node.SetObserver(true) - if s != cl { - // For all followers, pause apply so they only store messages in WAL but not apply and possibly snapshot. - err = o.node.PauseApply() - require_NoError(t, err) + for _, n := range s.raftNodes { + rn := n.(*raft) + if rn.accName == globalAccountName { + for i := uint64(0); i < 3; i++ { + // One server will be one behind and need to catchup. + if s.Name() == rs.Name() && i >= 2 { + break + } + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, i, ts, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + rn.Lock() + ae := rn.buildAppendEntry(entries) + ae.buf, err = ae.encode(scratch[:]) + require_NoError(t, err) + err = rn.storeToWAL(ae) + rn.Unlock() + require_NoError(t, err) + } + } } } - - updateDeliveredBuffer := func() []byte { - var b [4*binary.MaxVarintLen64 + 1]byte - b[0] = byte(updateDeliveredOp) - n := 1 - n += binary.PutUvarint(b[n:], 100) - n += binary.PutUvarint(b[n:], 100) - n += binary.PutUvarint(b[n:], 1) - n += binary.PutVarint(b[n:], time.Now().UnixNano()) - return b[:n] - } - - updateAcksBuffer := func() []byte { - var b [2*binary.MaxVarintLen64 + 1]byte - b[0] = byte(updateAcksOp) - n := 1 - n += binary.PutUvarint(b[n:], 100) - n += binary.PutUvarint(b[n:], 100) - return b[:n] - } - - // Store an uncommitted entry into our WAL, which will be committed and applied later. - co := lookupConsumer(cl) - rn := co.node.(*raft) - rn.Lock() - entries := []*Entry{{EntryNormal, updateDeliveredBuffer()}, {EntryNormal, updateAcksBuffer()}} - ae := encode(t, rn.buildAppendEntry(entries)) - err = rn.storeToWAL(ae) - minPindex := rn.pindex - rn.Unlock() - require_NoError(t, err) - - // Simulate leader change, we do this so we can check what happens in the upper layer logic. - rn.leadc <- true - rn.SetObserver(false) - - // Since upper layer is async, we don't know whether it will or will not act on the leader change. - // Wait for some time to check if it does. - time.Sleep(2 * time.Second) - rn.RLock() - maxPindex := rn.pindex - rn.RUnlock() - - r := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER") - ro := lookupConsumer(r) - rn = ro.node.(*raft) - - checkFor(t, 5*time.Second, time.Second, func() error { - rn.RLock() - defer rn.RUnlock() - if rn.pindex < maxPindex { - return fmt.Errorf("rn.pindex too low, expected %d, got %d", maxPindex, rn.pindex) - } - return nil - }) - - // We should only have 'Normal' entries. - // If we'd get a 'Snapshot' entry, that would mean it had incomplete state and would be reverting committed state. - var state StreamState - rn.wal.FastState(&state) - for seq := minPindex; seq <= maxPindex; seq++ { - ae, err = rn.loadEntry(seq) + // Restart all. + c.stopAll() + c.restartAll() + c.waitOnAllCurrent() + c.waitOnStreamLeader(globalAccountName, "TEST") + rs = c.serverByName(rs.Name()) + // Check all servers ended up with all published messages, which had quorum. + for _, s := range c.servers { + c.waitOnStreamCurrent(s, globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") require_NoError(t, err) - for _, entry := range ae.entries { - require_Equal(t, entry.Type, EntryNormal) + state := mset.state() + require_Equal(t, state.Msgs, 3) + require_Equal(t, state.Bytes, 99) + } + // Check that the first two published messages came from our WAL, and + // the last came from a catchup by another leader. + for _, n := range rs.raftNodes { + rn := n.(*raft) + if rn.accName == globalAccountName { + ae, err := rn.loadEntry(2) + require_NoError(t, err) + require_True(t, ae.leader == rn.ID()) + ae, err = rn.loadEntry(3) + require_NoError(t, err) + require_True(t, ae.leader == rn.ID()) + ae, err = rn.loadEntry(4) + require_NoError(t, err) + require_True(t, ae.leader != rn.ID()) } } } diff --git a/server/raft.go b/server/raft.go index 8ea88a80904..c43ec65a178 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3082,10 +3082,10 @@ func (n *raft) catchupStalled() bool { if n.catchup == nil { return false } - if n.catchup.pindex == n.pindex { + if n.catchup.pindex == n.commit { return time.Since(n.catchup.active) > 2*time.Second } - n.catchup.pindex = n.pindex + n.catchup.pindex = n.commit n.catchup.active = time.Now() return false } @@ -3104,7 +3104,7 @@ func (n *raft) createCatchup(ae *appendEntry) string { cterm: ae.pterm, cindex: ae.pindex, pterm: n.pterm, - pindex: n.pindex, + pindex: n.commit, active: time.Now(), } inbox := n.newCatchupInbox() @@ -3274,7 +3274,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if n.catchupStalled() { n.debug("Catchup may be stalled, will request again") inbox = n.createCatchup(ae) - ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false) + ar = newAppendEntryResponse(n.pterm, n.commit, n.id, false) } n.Unlock() if ar != nil { @@ -3315,13 +3315,15 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex { - // Check if this is a lower or equal index than what we were expecting. - if ae.pindex <= n.pindex { + // Check if this is a lower index than what we were expecting. + if ae.pindex < n.pindex { n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) var ar *appendEntryResponse + // An AppendEntry is stored at seq=ae.pindex+1. This can be checked when eae != nil, eae.pindex==ae.pindex. + seq := ae.pindex + 1 var success bool - if eae, _ := n.loadEntry(ae.pindex); eae == nil { + if eae, _ := n.loadEntry(seq); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. if ae.pterm == n.pterm && !catchingUp { @@ -3329,14 +3331,18 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } else { n.resetWAL() } - } else { - // If terms mismatched, or we got an error loading, delete that entry and all others past it. + } else if eae.term != ae.term { + // If terms mismatched, delete that entry and all others past it. // Make sure to cancel any catchups in progress. // Truncate will reset our pterm and pindex. Only do so if we have an entry. n.truncateWAL(eae.pterm, eae.pindex) + } else { + success = true + } + // Cancel regardless if truncated/unsuccessful. + if !success { + n.cancelCatchup() } - // Cancel regardless. - n.cancelCatchup() // Create response. ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success) @@ -3410,11 +3416,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { return } else { - n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) - if ae.pindex > n.pindex { + n.debug("AppendEntry did not match %d %d with %d %d (commit %d)", ae.pterm, ae.pindex, n.pterm, n.pindex, n.commit) + if ae.pindex > n.commit { // Setup our state for catching up. inbox := n.createCatchup(ae) - ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) + ar := newAppendEntryResponse(n.pterm, n.commit, n.id, false) n.Unlock() n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) arPool.Put(ar) diff --git a/server/raft_test.go b/server/raft_test.go index e63f5b6ed22..d6270555201 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -823,8 +823,8 @@ func TestNRGTermDoesntRollBackToPtermOnCatchup(t *testing.T) { require_Equal(t, rn.term, 2) if !rn.Leader() { - rn.truncateWAL(1, 6) // This will overwrite rn.term, so... - rn.term = 2 // ... we'll set it back manually. + rn.truncateWAL(1, 6) + require_Equal(t, rn.term, 2) // rn.term must stay the same require_Equal(t, rn.pterm, 1) require_Equal(t, rn.pindex, 6) } @@ -984,68 +984,94 @@ func TestNRGRemoveLeaderPeerDeadlockBug(t *testing.T) { } func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - rg := c.createRaftGroup("TEST", 3, newStateAdder) - rg.waitOnLeader() - - var err error - var scratch [1024]byte - - // Simulate leader storing an AppendEntry in WAL but being hard killed before it can propose to its peers. - n := rg.leader().node().(*raft) - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - n.Lock() - ae := n.buildAppendEntry(entries) - ae.buf, err = ae.encode(scratch[:]) - require_NoError(t, err) - err = n.storeToWAL(ae) - n.Unlock() - require_NoError(t, err) - - // Stop the leader so it moves to another one. - n.shutdown(false) - - // Wait for another leader to be picked - rg.waitOnLeader() - - // Restart the previous leader that contains the stored AppendEntry without quorum. - for _, a := range rg { - if a.node().ID() == n.ID() { - sa := a.(*stateAdder) - sa.restart() - break - } + tests := []struct { + title string + modify func(rg smGroup) + }{ + { + // state equals, only need to remove the entry + title: "equal", + modify: func(rg smGroup) {}, + }, + { + // state diverged, need to replace the entry + title: "diverged", + modify: func(rg smGroup) { + rg.leader().(*stateAdder).proposeDelta(11) + }, + }, } - // The previous leader's WAL should truncate to remove the AppendEntry only it has. - // Eventually all WALs for all peers must match. - checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { - var expected [][]byte - for _, a := range rg { - an := a.node().(*raft) - var state StreamState - an.wal.FastState(&state) - if len(expected) > 0 && int(state.LastSeq-state.FirstSeq+1) != len(expected) { - return fmt.Errorf("WAL is different: too many entries") - } - for index := state.FirstSeq; index <= state.LastSeq; index++ { - ae, err := an.loadEntry(index) - if err != nil { - return err - } - seq := int(index) - if len(expected) < seq { - expected = append(expected, ae.buf) - } else if !bytes.Equal(expected[seq-1], ae.buf) { - return fmt.Errorf("WAL is different: stored bytes differ") + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + var err error + var scratch [1024]byte + + // Simulate leader storing an AppendEntry in WAL but being hard killed before it can propose to its peers. + n := rg.leader().node().(*raft) + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + n.Lock() + ae := n.buildAppendEntry(entries) + ae.buf, err = ae.encode(scratch[:]) + require_NoError(t, err) + err = n.storeToWAL(ae) + n.Unlock() + require_NoError(t, err) + + // Stop the leader so it moves to another one. + n.shutdown(false) + + // Wait for another leader to be picked + rg.waitOnLeader() + + // Make a modification, specific to this test. + test.modify(rg) + + // Restart the previous leader that contains the stored AppendEntry without quorum. + for _, a := range rg { + if a.node().ID() == n.ID() { + sa := a.(*stateAdder) + sa.restart() + break } } - } - return nil - }) + + // The previous leader's WAL should truncate to remove the AppendEntry only it has. + // Eventually all WALs for all peers must match. + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + var expected [][]byte + for _, a := range rg { + an := a.node().(*raft) + var state StreamState + an.wal.FastState(&state) + if len(expected) > 0 && int(state.LastSeq-state.FirstSeq+1) != len(expected) { + return fmt.Errorf("WAL is different: too many entries") + } + // Loop over all entries in the WAL, checking if the contents for all RAFT nodes are equal. + for index := state.FirstSeq; index <= state.LastSeq; index++ { + ae, err := an.loadEntry(index) + if err != nil { + return err + } + seq := int(index) + if len(expected) < seq { + expected = append(expected, ae.buf) + } else if !bytes.Equal(expected[seq-1], ae.buf) { + return fmt.Errorf("WAL is different: stored bytes differ") + } + } + } + return nil + }) + }) + } } func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { @@ -1088,3 +1114,132 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { } } } + +func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + s := c.servers[0] // RunBasicJetStreamServer not available + + ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage}) + require_NoError(t, err) + cfg := &RaftConfig{Name: "TEST", Store: t.TempDir(), Log: ms} + + err = s.bootstrapRaftNode(cfg, nil, false) + require_NoError(t, err) + n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(t, err) + + // An AppendEntry is encoded into a buffer and that's stored into the WAL. + // This is a helper function to generate that buffer. + encode := func(ae *appendEntry) *appendEntry { + buf, err := ae.encode(nil) + require_NoError(t, err) + ae.buf = buf + return ae + } + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, for first leader + aeInitial := encode(&appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeUncommitted := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeNoQuorum := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + + // Timeline, after leader change + aeMissed := encode(&appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 2, entries: entries}) + aeCatchupTrigger := encode(&appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 3, entries: entries}) + aeHeartbeat2 := encode(&appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 4, entries: nil}) + aeHeartbeat3 := encode(&appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeInitial, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + + // We get one entry that has quorum (but we don't know that yet), so it stays uncommitted for a bit. + n.processAppendEntry(aeUncommitted, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We get one entry that has NO quorum (but we don't know that yet). + n.processAppendEntry(aeNoQuorum, n.aesub) + require_Equal(t, n.wal.State().Msgs, 3) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We've just had a leader election, and we missed one message from the previous leader, we should catchup. + n.processAppendEntry(aeCatchupTrigger, n.aesub) + require_Equal(t, n.wal.State().Msgs, 3) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.commit + + // Make sure our WAL was not truncated, and we're still catching up. + aeUncommitted.leader = nats1 + aeUncommitted = encode(aeUncommitted) + n.processAppendEntry(aeUncommitted, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 3) + require_True(t, n.catchup != nil) + // Our entry should not be touched, so the 'leader' should've stayed the same. + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We now notice the leader indicated a different entry at the (no quorum) index, should truncate. + n.processAppendEntry(aeMissed, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup == nil) + + // We get a heartbeat that prompts us to catchup again. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.commit + + // We get the uncommitted entry again, it should stay the same. + n.processAppendEntry(aeUncommitted, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup != nil) + // Our entry should still stay the same. + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We now get the missed append entry, store it. + n.processAppendEntry(aeMissed, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 3) + require_True(t, n.catchup != nil) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // We now get the entry that initially triggered us to catchup again, it should be added. + n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 4) + require_True(t, n.catchup != nil) + entry, err = n.loadEntry(4) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // Heartbeat, makes sure we commit (and reset catchup, as we're now up-to-date). + n.processAppendEntry(aeHeartbeat3, n.aesub) + require_Equal(t, n.commit, 4) + require_True(t, n.catchup == nil) +}