Skip to content

Commit

Permalink
Merge pull request #14721 from nvanbenschoten/nvanbenschoten/noCommit…
Browse files Browse the repository at this point in the history
…tedOnSnap

raft: don't apply entries when applying snapshot
  • Loading branch information
ahrtr authored Nov 14, 2022
2 parents edd4d51 + 539a841 commit 970ecfc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 18 deletions.
21 changes: 15 additions & 6 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,13 @@ func (l *raftLog) unstableEntries() []pb.Entry {
// If applied is smaller than the index of snapshot, it returns all committed
// entries after the index of snapshot.
func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex())
if l.committed+1 > off {
ents, err := l.slice(off, l.committed+1, l.maxNextCommittedEntsSize)
if l.hasPendingSnapshot() {
// See comment in hasNextCommittedEnts.
return nil
}
if l.committed > l.applied {
lo, hi := l.applied+1, l.committed+1 // [lo, hi)
ents, err := l.slice(lo, hi, l.maxNextCommittedEntsSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand All @@ -195,13 +199,18 @@ func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
// hasNextCommittedEnts returns if there is any available entries for execution.
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
func (l *raftLog) hasNextCommittedEnts() bool {
off := max(l.applied+1, l.firstIndex())
return l.committed+1 > off
if l.hasPendingSnapshot() {
// If we have a snapshot to apply, don't also return any committed
// entries. Doing so raises questions about what should be applied
// first.
return false
}
return l.committed > l.applied
}

// hasPendingSnapshot returns if there is pending snapshot waiting for applying.
func (l *raftLog) hasPendingSnapshot() bool {
return l.unstable.snapshot != nil && !IsEmptySnap(*l.unstable.snapshot)
return l.unstable.snapshot != nil
}

func (l *raftLog) snapshot() (pb.Snapshot, error) {
Expand Down
40 changes: 28 additions & 12 deletions raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,16 @@ func TestHasNextCommittedEnts(t *testing.T) {
{Term: 1, Index: 6},
}
tests := []struct {
applied uint64
hasNext bool
applied uint64
snap bool
whasNext bool
}{
{0, true},
{3, true},
{4, true},
{5, false},
{applied: 0, snap: false, whasNext: true},
{applied: 3, snap: false, whasNext: true},
{applied: 4, snap: false, whasNext: true},
{applied: 5, snap: false, whasNext: false},
// With snapshot.
{applied: 3, snap: true, whasNext: false},
}
for i, tt := range tests {
storage := NewMemoryStorage()
Expand All @@ -364,10 +367,15 @@ func TestHasNextCommittedEnts(t *testing.T) {
raftLog.append(ents...)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
if tt.snap {
newSnap := snap
newSnap.Metadata.Index++
raftLog.restore(newSnap)
}

hasNext := raftLog.hasNextCommittedEnts()
if hasNext != tt.hasNext {
t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.hasNext)
if hasNext != tt.whasNext {
t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.whasNext)
}
}
}
Expand All @@ -383,12 +391,15 @@ func TestNextCommittedEnts(t *testing.T) {
}
tests := []struct {
applied uint64
snap bool
wents []pb.Entry
}{
{0, ents[:2]},
{3, ents[:2]},
{4, ents[1:2]},
{5, nil},
{applied: 0, snap: false, wents: ents[:2]},
{applied: 3, snap: false, wents: ents[:2]},
{applied: 4, snap: false, wents: ents[1:2]},
{applied: 5, snap: false, wents: nil},
// With snapshot.
{applied: 3, snap: true, wents: nil},
}
for i, tt := range tests {
storage := NewMemoryStorage()
Expand All @@ -397,6 +408,11 @@ func TestNextCommittedEnts(t *testing.T) {
raftLog.append(ents...)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
if tt.snap {
newSnap := snap
newSnap.Metadata.Index++
raftLog.restore(newSnap)
}

nents := raftLog.nextCommittedEnts()
if !reflect.DeepEqual(nents, tt.wents) {
Expand Down

0 comments on commit 970ecfc

Please sign in to comment.