Skip to content

Commit

Permalink
[FIXED] Don't remove snapshot if truncate to applied (#6055)
Browse files Browse the repository at this point in the history
After installing a snapshot for `n.applied=5`, and then truncating back
to its `pterm` and the same `pindex` as `applied` it would remove the
snapshot as well, even though it should be kept.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Oct 29, 2024
2 parents 86944bc + ebffb7f commit e339076
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
2 changes: 1 addition & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3191,7 +3191,7 @@ func (n *raft) truncateWAL(term, index uint64) {
defer func() {
// Check to see if we invalidated any snapshots that might have held state
// from the entries we are truncating.
if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex >= index {
if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex > index {
os.Remove(n.snapfile)
n.snapfile = _EMPTY_
}
Expand Down
46 changes: 46 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"math/rand"
"os"
"path"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -1451,5 +1452,50 @@ func TestNRGCatchupFromNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 2)
require_True(t, n.catchup == nil)
}

func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline.
aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Initial case is simple, just store the entry.
n.processAppendEntry(aeMsg, n.aesub)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 1)

// Simulate upper layer calling down to apply.
n.Applied(1)

// Install snapshot and check it exists.
err = n.InstallSnapshot(nil)
require_NoError(t, err)

snapshots := path.Join(n.sd, snapshotsDir)
files, err := os.ReadDir(snapshots)
require_NoError(t, err)
require_Equal(t, len(files), 1)

// Truncate and check snapshot is kept.
n.truncateWAL(n.pterm, n.applied)

files, err = os.ReadDir(snapshots)
require_NoError(t, err)
require_Equal(t, len(files), 1)
}

0 comments on commit e339076

Please sign in to comment.