Skip to content

Commit

Permalink
raft: add allowUnstable param to raftLog.nextCommittedEnts
Browse files Browse the repository at this point in the history
This allows callers to configure whether they want to allow entries that
are not already in stable storage to be returned from the method. This
will be used in a future commit.

Signed-off-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
nvanbenschoten committed Oct 25, 2022
1 parent e581ef5 commit 49c2a62
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 34 deletions.
17 changes: 12 additions & 5 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,16 @@ func (l *raftLog) hasNextUnstableEnts() bool {
// nextCommittedEnts returns all the available entries for execution.
// If applied is smaller than the index of snapshot, it returns all committed
// entries after the index of snapshot.
func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
if l.hasNextOrInProgressSnapshot() {
// See comment in hasNextCommittedEnts.
return nil
}
if l.committed > l.applying {
lo, hi := l.applying+1, l.committed+1 // [lo, hi)
lo, hi := l.applying+1, l.committed+1 // [lo, hi)
if !allowUnstable {
hi = min(hi, l.unstable.offset)
}
if lo < hi {
// TODO: handle pagination correctly.
ents, err := l.slice(lo, hi, l.maxNextEntsSize)
if err != nil {
Expand All @@ -211,14 +214,18 @@ func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {

// hasNextCommittedEnts returns if there is any available entries for execution.
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
func (l *raftLog) hasNextCommittedEnts() bool {
func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool {
if l.hasNextOrInProgressSnapshot() {
// If we have a snapshot to apply, don't also return any committed
// entries. Doing so raises questions about what should be applied
// first.
return false
}
return l.committed > l.applying
lo, hi := l.applying+1, l.committed+1 // [lo, hi)
if !allowUnstable {
hi = min(hi, l.unstable.offset)
}
return lo < hi
}

// nextUnstableSnapshot returns the snapshot, if present, that is available to
Expand Down
68 changes: 44 additions & 24 deletions raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,25 +349,35 @@ func TestHasNextCommittedEnts(t *testing.T) {
{Term: 1, Index: 6},
}
tests := []struct {
applied uint64
applying uint64
snap bool
whasNext bool
applied uint64
applying uint64
allowUnstable bool
snap bool
whasNext bool
}{
{applied: 3, applying: 3, snap: false, whasNext: true},
{applied: 3, applying: 4, snap: false, whasNext: true},
{applied: 3, applying: 5, snap: false, whasNext: false},
{applied: 4, applying: 4, snap: false, whasNext: true},
{applied: 4, applying: 5, snap: false, whasNext: false},
{applied: 5, applying: 5, snap: false, whasNext: false},
{applied: 3, applying: 3, allowUnstable: true, snap: false, whasNext: true},
{applied: 3, applying: 4, allowUnstable: true, snap: false, whasNext: true},
{applied: 3, applying: 5, allowUnstable: true, snap: false, whasNext: false},
{applied: 4, applying: 4, allowUnstable: true, snap: false, whasNext: true},
{applied: 4, applying: 5, allowUnstable: true, snap: false, whasNext: false},
{applied: 5, applying: 5, allowUnstable: true, snap: false, whasNext: false},
// Don't allow unstable entries.
{applied: 3, applying: 3, allowUnstable: false, snap: false, whasNext: true},
{applied: 3, applying: 4, allowUnstable: false, snap: false, whasNext: false},
{applied: 3, applying: 5, allowUnstable: false, snap: false, whasNext: false},
{applied: 4, applying: 4, allowUnstable: false, snap: false, whasNext: false},
{applied: 4, applying: 5, allowUnstable: false, snap: false, whasNext: false},
{applied: 5, applying: 5, allowUnstable: false, snap: false, whasNext: false},
// With snapshot.
{applied: 3, applying: 3, snap: true, whasNext: false},
{applied: 3, applying: 3, allowUnstable: true, snap: true, whasNext: false},
}
for i, tt := range tests {
storage := NewMemoryStorage()
storage.ApplySnapshot(snap)
storage.Append(ents[:1])
raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
raftLog.acceptApplying(tt.applying)
Expand All @@ -377,7 +387,7 @@ func TestHasNextCommittedEnts(t *testing.T) {
raftLog.restore(newSnap)
}

hasNext := raftLog.hasNextCommittedEnts()
hasNext := raftLog.hasNextCommittedEnts(tt.allowUnstable)
if hasNext != tt.whasNext {
t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.whasNext)
}
Expand All @@ -394,25 +404,35 @@ func TestNextCommittedEnts(t *testing.T) {
{Term: 1, Index: 6},
}
tests := []struct {
applied uint64
applying uint64
snap bool
wents []pb.Entry
applied uint64
applying uint64
allowUnstable bool
snap bool
wents []pb.Entry
}{
{applied: 3, applying: 3, snap: false, wents: ents[:2]},
{applied: 3, applying: 4, snap: false, wents: ents[1:2]},
{applied: 3, applying: 5, snap: false, wents: nil},
{applied: 4, applying: 4, snap: false, wents: ents[1:2]},
{applied: 4, applying: 5, snap: false, wents: nil},
{applied: 5, applying: 5, snap: false, wents: nil},
{applied: 3, applying: 3, allowUnstable: true, snap: false, wents: ents[:2]},
{applied: 3, applying: 4, allowUnstable: true, snap: false, wents: ents[1:2]},
{applied: 3, applying: 5, allowUnstable: true, snap: false, wents: nil},
{applied: 4, applying: 4, allowUnstable: true, snap: false, wents: ents[1:2]},
{applied: 4, applying: 5, allowUnstable: true, snap: false, wents: nil},
{applied: 5, applying: 5, allowUnstable: true, snap: false, wents: nil},
// Don't allow unstable entries.
{applied: 3, applying: 3, allowUnstable: false, snap: false, wents: ents[:1]},
{applied: 3, applying: 4, allowUnstable: false, snap: false, wents: nil},
{applied: 3, applying: 5, allowUnstable: false, snap: false, wents: nil},
{applied: 4, applying: 4, allowUnstable: false, snap: false, wents: nil},
{applied: 4, applying: 5, allowUnstable: false, snap: false, wents: nil},
{applied: 5, applying: 5, allowUnstable: false, snap: false, wents: nil},
// With snapshot.
{applied: 3, applying: 3, snap: true, wents: nil},
{applied: 3, applying: 3, allowUnstable: true, snap: true, wents: nil},
}
for i, tt := range tests {
storage := NewMemoryStorage()
storage.ApplySnapshot(snap)
storage.Append(ents[:1])
raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
raftLog.acceptApplying(tt.applying)
Expand All @@ -422,7 +442,7 @@ func TestNextCommittedEnts(t *testing.T) {
raftLog.restore(newSnap)
}

nents := raftLog.nextCommittedEnts()
nents := raftLog.nextCommittedEnts(tt.allowUnstable)
if !reflect.DeepEqual(nents, tt.wents) {
t.Errorf("#%d: nents = %+v, want %+v", i, nents, tt.wents)
}
Expand Down
2 changes: 1 addition & 1 deletion raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.nextUnstableEnts(),
CommittedEntries: r.raftLog.nextCommittedEnts(),
CommittedEntries: r.raftLog.nextCommittedEnts(true /* allowUnstable */),
Messages: r.msgs,
}
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
Expand Down
6 changes: 3 additions & 3 deletions raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func TestLeaderCommitEntry(t *testing.T) {
t.Errorf("committed = %d, want %d", g, li+1)
}
wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
if g := r.raftLog.nextCommittedEnts(); !reflect.DeepEqual(g, wents) {
if g := r.raftLog.nextCommittedEnts(true); !reflect.DeepEqual(g, wents) {
t.Errorf("nextCommittedEnts = %+v, want %+v", g, wents)
}
msgs := r.readMessages()
Expand Down Expand Up @@ -540,7 +540,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) {

li := uint64(len(tt))
wents := append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")})
if g := r.raftLog.nextCommittedEnts(); !reflect.DeepEqual(g, wents) {
if g := r.raftLog.nextCommittedEnts(true); !reflect.DeepEqual(g, wents) {
t.Errorf("#%d: ents = %+v, want %+v", i, g, wents)
}
}
Expand Down Expand Up @@ -592,7 +592,7 @@ func TestFollowerCommitEntry(t *testing.T) {
t.Errorf("#%d: committed = %d, want %d", i, g, tt.commit)
}
wents := tt.ents[:int(tt.commit)]
if g := r.raftLog.nextCommittedEnts(); !reflect.DeepEqual(g, wents) {
if g := r.raftLog.nextCommittedEnts(true); !reflect.DeepEqual(g, wents) {
t.Errorf("#%d: nextCommittedEnts = %v, want %v", i, g, wents)
}
}
Expand Down
2 changes: 1 addition & 1 deletion raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (rn *RawNode) HasReady() bool {
if r.raftLog.hasNextUnstableSnapshot() {
return true
}
if len(r.msgs) > 0 || r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts() {
if len(r.msgs) > 0 || r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(true /* allowUnstable */) {
return true
}
if len(r.readStates) != 0 {
Expand Down

0 comments on commit 49c2a62

Please sign in to comment.