diff --git a/server/raft.go b/server/raft.go index 15fff1f826e..76597d8d660 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3191,7 +3191,7 @@ func (n *raft) truncateWAL(term, index uint64) { defer func() { // Check to see if we invalidated any snapshots that might have held state // from the entries we are truncating. - if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex >= index { + if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex > index { os.Remove(n.snapfile) n.snapfile = _EMPTY_ } diff --git a/server/raft_test.go b/server/raft_test.go index c0e209c3ad4..1a6c5e83878 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -20,6 +20,7 @@ import ( "math" "math/rand" "os" + "path" "path/filepath" "testing" "time" @@ -1451,5 +1452,50 @@ func TestNRGCatchupFromNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t * n.processAppendEntry(aeHeartbeat2, n.aesub) require_Equal(t, n.commit, 2) require_True(t, n.catchup == nil) +} + +func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeMsg, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.pterm, 1) + + // Simulate upper layer calling down to apply. + n.Applied(1) + + // Install snapshot and check it exists. + err = n.InstallSnapshot(nil) + require_NoError(t, err) + + snapshots := path.Join(n.sd, snapshotsDir) + files, err := os.ReadDir(snapshots) + require_NoError(t, err) + require_Equal(t, len(files), 1) + + // Truncate and check snapshot is kept. + n.truncateWAL(n.pterm, n.applied) + + files, err = os.ReadDir(snapshots) + require_NoError(t, err) + require_Equal(t, len(files), 1) }