Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
127424: raft: use last accepted term for log writes r=nvanbenschoten a=pav-kv

This PR modifies the log write protocol to take advantage of the last accepted term tracked in `raftLog`/`unstable`.

Log entry writes are ordered by the last accepted term (`unstable.term`). Entries in the log can be overwritten only if this term is bumped (when we accept a log write from a higher term leader). If `unstable.term` is unchanged when we receive an acknowledgement from log storage, then the acknowledged entries don't have any overwrites in the write queue, and it is safe to remove them from `unstable`.

Part of cockroachdb#124440

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Jul 30, 2024
2 parents 948013f + 4beb96b commit 8cbc870
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 259 deletions.
9 changes: 1 addition & 8 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,6 @@ func (l *raftLog) hasNextUnstableEnts() bool {
return len(l.nextUnstableEnts()) > 0
}

// hasNextOrInProgressUnstableEnts returns if there are any entries that are
// available to be written to the local stable log or in the process of being
// written to the local stable log.
func (l *raftLog) hasNextOrInProgressUnstableEnts() bool {
return len(l.unstable.entries) > 0
}

// nextCommittedEnts returns all the available entries for execution.
// Entries can be committed even when the local raft instance has not durably
// appended them to the local raft log yet. If allowUnstable is true, committed
Expand Down Expand Up @@ -407,7 +400,7 @@ func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable
i < l.maxAppliableIndex(allowUnstable)
}

func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) }
func (l *raftLog) stableTo(mark logMark) { l.unstable.stableTo(mark) }

func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }

Expand Down
64 changes: 34 additions & 30 deletions pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func TestHasNextCommittedEnts(t *testing.T) {
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.stableTo(logMark{term: init.term, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestNextCommittedEnts(t *testing.T) {
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.stableTo(logMark{term: init.term, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestAcceptApplying(t *testing.T) {
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.stableTo(logMark{term: init.term, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.appliedTo(3, 0 /* size */)

Expand Down Expand Up @@ -488,7 +488,7 @@ func TestAppliedTo(t *testing.T) {
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.stableTo(logMark{term: init.term, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.appliedTo(3, 0 /* size */)
raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */)
Expand Down Expand Up @@ -525,7 +525,7 @@ func TestNextUnstableEnts(t *testing.T) {
require.Equal(t, len(tt.entries) != 0, raftLog.hasNextUnstableEnts())
require.Equal(t, tt.entries, raftLog.nextUnstableEnts())
if len(tt.entries) != 0 {
raftLog.stableTo(tt.lastEntryID())
raftLog.stableTo(tt.mark())
}
require.Equal(t, tt.lastEntryID(), raftLog.unstable.prev)
})
Expand Down Expand Up @@ -563,20 +563,24 @@ func TestCommitTo(t *testing.T) {
func TestStableTo(t *testing.T) {
init := entryID{}.append(1, 2)
for _, tt := range []struct {
stablei uint64
stablet uint64
wunstable uint64
mark logMark
want uint64 // prev.index
}{
{1, 1, 2},
{2, 2, 3},
{2, 1, 1}, // bad term
{3, 1, 1}, // bad index
// out of bounds
{mark: logMark{term: 2, index: 0}, want: 0},
{mark: logMark{term: 2, index: 3}, want: 0},
// outdated accepted term
{mark: logMark{term: 1, index: 1}, want: 0},
{mark: logMark{term: 1, index: 2}, want: 0},
// successful acknowledgements
{mark: logMark{term: 2, index: 1}, want: 1},
{mark: logMark{term: 2, index: 2}, want: 2},
} {
t.Run("", func(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), discardLogger)
require.True(t, raftLog.append(init))
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
require.Equal(t, tt.wunstable, raftLog.unstable.prev.index+1)
raftLog.stableTo(tt.mark)
require.Equal(t, tt.want, raftLog.unstable.prev.index)
})
}
}
Expand All @@ -586,24 +590,24 @@ func TestStableToWithSnap(t *testing.T) {
snap := pb.Snapshot{Metadata: pb.SnapshotMetadata{Term: snapID.term, Index: snapID.index}}
for _, tt := range []struct {
sl logSlice
to entryID
want uint64 // the unstable.offset
to logMark
want uint64 // prev.index
}{
// out of bounds
{sl: snapID.append(), to: entryID{term: 1, index: 2}, want: 6},
{sl: snapID.append(), to: entryID{term: 2, index: 6}, want: 6},
{sl: snapID.append(), to: entryID{term: 2, index: 7}, want: 6},
{sl: snapID.append(6, 6, 8), to: entryID{term: 2, index: 4}, want: 6},
{sl: snapID.append(6, 6, 8), to: entryID{term: 2, index: 10}, want: 6},
{sl: snapID.append(), to: logMark{term: 1, index: 2}, want: 5},
{sl: snapID.append(), to: logMark{term: 2, index: 6}, want: 5},
{sl: snapID.append(), to: logMark{term: 2, index: 7}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 2, index: 4}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 2, index: 10}, want: 5},
// successful acknowledgements
{sl: snapID.append(6, 6, 8), to: entryID{term: 2, index: 5}, want: 6},
{sl: snapID.append(6, 6, 8), to: entryID{term: 6, index: 6}, want: 7},
{sl: snapID.append(6, 6, 8), to: entryID{term: 6, index: 7}, want: 8},
{sl: snapID.append(6, 6, 8), to: entryID{term: 8, index: 8}, want: 9},
// mismatching entry terms
{sl: snapID.append(6, 6, 8), to: entryID{term: 3, index: 6}, want: 6},
{sl: snapID.append(6, 6, 8), to: entryID{term: 3, index: 7}, want: 6},
{sl: snapID.append(6, 6, 8), to: entryID{term: 3, index: 8}, want: 6},
{sl: snapID.append(6, 6, 8), to: logMark{term: 8, index: 5}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 8, index: 6}, want: 6},
{sl: snapID.append(6, 6, 8), to: logMark{term: 8, index: 7}, want: 7},
{sl: snapID.append(6, 6, 8), to: logMark{term: 8, index: 8}, want: 8},
// mismatching accepted term
{sl: snapID.append(6, 6, 8), to: logMark{term: 3, index: 6}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 3, index: 7}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 3, index: 8}, want: 5},
} {
t.Run("", func(t *testing.T) {
s := NewMemoryStorage()
Expand All @@ -612,7 +616,7 @@ func TestStableToWithSnap(t *testing.T) {
raftLog := newLog(s, discardLogger)
require.True(t, raftLog.append(tt.sl))
raftLog.stableTo(tt.to)
require.Equal(t, tt.want, raftLog.unstable.prev.index+1)
require.Equal(t, tt.want, raftLog.unstable.prev.index)
})
}
}
Expand Down
46 changes: 22 additions & 24 deletions pkg/raft/log_unstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,41 +167,39 @@ func (u *unstable) acceptInProgress() {
u.entryInProgress = u.lastIndex()
}

// stableTo marks entries up to the entry with the specified (index, term) as
// stableTo marks entries up to the entry at the specified (term, index) mark as
// being successfully written to stable storage.
//
// The method should only be called when the caller can attest that the entries
// can not be overwritten by an in-progress log append. See the related comment
// in newStorageAppendRespMsg.
func (u *unstable) stableTo(id entryID) {
if u.snapshot != nil && id.index == u.snapshot.Metadata.Index {
// Index matched unstable snapshot, not unstable entry. Ignore.
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", id.index)
// The method makes sure the entries can not be overwritten by an in-progress
// log append. See the related comment in newStorageAppendRespMsg.
func (u *unstable) stableTo(mark logMark) {
if mark.term != u.term {
// The last accepted term has changed. Ignore. This is possible if part or
// all of the unstable log was replaced between that time that a set of
// entries started to be written to stable storage and when they finished.
u.logger.Infof("mark (term,index)=(%d,%d) mismatched the last accepted "+
"term %d in unstable log; ignoring ", mark.term, mark.index, u.term)
return
}
if id.index <= u.prev.index || id.index > u.lastIndex() {
// Unstable entry missing. Ignore.
u.logger.Infof("entry at index %d missing from unstable log; ignoring", id.index)
if u.snapshot != nil && mark.index == u.snapshot.Metadata.Index {
// Index matched unstable snapshot, not unstable entry. Ignore.
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", mark.index)
return
}
if term := u.termAt(id.index); term != id.term {
// Term mismatch between unstable entry and specified entry. Ignore.
// This is possible if part or all of the unstable log was replaced
// between that time that a set of entries started to be written to
// stable storage and when they finished.
u.logger.Infof("entry at (index,term)=(%d,%d) mismatched with "+
"entry at (%d,%d) in unstable log; ignoring", id.index, id.term, id.index, term)
if mark.index <= u.prev.index || mark.index > u.lastIndex() {
// Unstable entry missing. Ignore.
u.logger.Infof("entry at index %d missing from unstable log; ignoring", mark.index)
return
}
if u.snapshot != nil {
u.logger.Panicf("entry %+v acked earlier than the snapshot(in-progress=%t): %s",
id, u.snapshotInProgress, DescribeSnapshot(*u.snapshot))
u.logger.Panicf("mark %+v acked earlier than the snapshot(in-progress=%t): %s",
mark, u.snapshotInProgress, DescribeSnapshot(*u.snapshot))
}
u.logSlice = u.forward(id.index)
// TODO(pav-kv): why can id.index overtake u.entryInProgress? Probably bugs in
// tests using the log writes incorrectly, e.g. TestLeaderStartReplication
u.logSlice = u.forward(mark.index)
// TODO(pav-kv): why can mark.index overtake u.entryInProgress? Probably bugs
// in tests using the log writes incorrectly, e.g. TestLeaderStartReplication
// takes nextUnstableEnts() without acceptInProgress().
u.entryInProgress = max(u.entryInProgress, id.index)
u.entryInProgress = max(u.entryInProgress, mark.index)
u.shrinkEntriesArray()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/log_unstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func TestUnstableStableTo(t *testing.T) {
u.stableSnapTo(u.snapshot.Metadata.Index)
}
u.checkInvariants(t)
u.stableTo(entryID{term: tt.term, index: tt.index})
u.stableTo(logMark{term: tt.term, index: tt.index})
u.checkInvariants(t)
require.Equal(t, tt.wprev, u.prev.index)
require.Equal(t, tt.wentryInProgress, u.entryInProgress)
Expand Down
15 changes: 1 addition & 14 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,19 +1133,6 @@ func (r *raft) Step(m pb.Message) error {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, last.term, last.index, r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
} else if m.Type == pb.MsgStorageAppendResp {
if m.Snapshot != nil {
// Even if the snapshot applied under a different term, its application
// is still valid. Snapshots carry committed (term-independent) state.
r.appliedSnap(m.Snapshot)
}
if m.Index != 0 {
// Don't consider the appended log entries to be stable because they may
// have been overwritten in the unstable log during a later term. See
// the comment in newStorageAppendResp for more about this race.
r.logger.Infof("%x [term: %d] ignored entry appends from a %s message with lower term [term: %d]",
r.id, r.Term, m.Type, m.Term)
}
} else {
// ignore other cases
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
Expand All @@ -1169,7 +1156,7 @@ func (r *raft) Step(m pb.Message) error {
r.appliedSnap(m.Snapshot)
}
if m.Index != 0 {
r.raftLog.stableTo(entryID{term: m.LogTerm, index: m.Index})
r.raftLog.stableTo(logMark{term: m.LogTerm, index: m.Index})
}

case pb.MsgStorageApplyResp:
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ func commitNoopEntry(r *raft, s *MemoryStorage) {
r.readMessages()
s.Append(r.raftLog.nextUnstableEnts())
r.raftLog.appliedTo(r.raftLog.committed, 0 /* size */)
r.raftLog.stableTo(r.raftLog.lastEntryID())
r.raftLog.stableTo(r.raftLog.unstable.mark())
}

func acceptAndReply(m pb.Message) pb.Message {
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
// Append unstable entries.
s.Append(r.raftLog.nextUnstableEnts())
r.raftLog.stableTo(r.raftLog.lastEntryID())
r.raftLog.stableTo(r.raftLog.unstable.mark())

// Run post-append steps.
r.advanceMessagesAfterAppend()
Expand Down
Loading

0 comments on commit 8cbc870

Please sign in to comment.