Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
126783: raft: make unstable a logSlice r=nvanbenschoten a=pav-kv

This PR refactors `raft.unstable` data structure to be a type-safe `logSlice`, and equips it with safety checks protecting the log append stack from appends that do not comply with the expected raft behaviours.

One immediate benefit of doing so is that `unstable` now knows the `lastEntryID` at all times, whereas previously it would fall back to fetching it from `Storage` interface when empty.

The second benefit is introducing the `logSlice.term` field, which carries strong semantics: the log is consistent with the leader log at this term. This allows implementing safety checks in append methods, preventing log truncations when not expected. Log truncations in raft can only happen when accepting a higher-term log append.

The `logSlice.term` field replaces `raft.accTerm`.

Part of cockroachdb#124440

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Jul 10, 2024
2 parents d54f924 + 6a948f4 commit 0179c39
Show file tree
Hide file tree
Showing 7 changed files with 533 additions and 514 deletions.
4 changes: 1 addition & 3 deletions pkg/raft/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
if err != nil {
return err
}

if lastIndex != 0 {
} else if lastIndex != 0 {
return errors.New("can't bootstrap a nonempty Storage")
}

Expand Down
83 changes: 52 additions & 31 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,14 @@ func newLogWithSize(
if err != nil {
panic(err) // TODO(bdarnell)
}
lastTerm, err := storage.Term(lastIndex)
if err != nil {
panic(err) // TODO(pav-kv)
}
last := entryID{term: lastTerm, index: lastIndex}
return &raftLog{
storage: storage,
unstable: newUnstable(lastIndex+1, logger),
unstable: newUnstable(last, logger),
maxApplyingEntsSize: maxApplyingEntsSize,

// Initialize our committed and applied pointers to the time of the last compaction.
Expand All @@ -99,8 +104,27 @@ func newLogWithSize(
}

func (l *raftLog) String() string {
// TODO(pav-kv): clean-up this message. It will change all the datadriven
// tests, so do it in a contained PR.
return fmt.Sprintf("committed=%d, applied=%d, applying=%d, unstable.offset=%d, unstable.offsetInProgress=%d, len(unstable.Entries)=%d",
l.committed, l.applied, l.applying, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries))
l.committed, l.applied, l.applying, l.unstable.prev.index+1, l.unstable.entryInProgress+1, len(l.unstable.entries))
}

// accTerm returns the term of the leader whose append was accepted into the log
// last. Note that a rejected append does not update accTerm, by definition.
//
// Invariant: the log is a prefix of the accTerm's leader log
// Invariant: lastEntryID().term <= accTerm <= raft.Term
//
// In steady state, accTerm == raft.Term. When someone campaigns, raft.Term
// briefly overtakes the accTerm. However, accTerm catches up as soon as we
// accept an append from the new leader.
//
// NB: the log can be partially or fully compacted. When we say "log" above, we
// logically include all the entries that were the pre-image of a snapshot, as
// well as the entries that are still physically in the log.
func (l *raftLog) accTerm() uint64 {
return l.unstable.term
}

// maybeAppend conditionally appends the given log slice to the log, making it
Expand All @@ -113,34 +137,34 @@ func (l *raftLog) String() string {
//
// Returns false if the operation can not be done: entry a.prev does not match
// the log (so this log slice is insufficient to make our log consistent with
// the leader log), or is out of bounds (appending it would introduce a gap).
// the leader log), the slice is out of bounds (appending it would introduce a
// gap), or a.term is outdated.
func (l *raftLog) maybeAppend(a logSlice) bool {
match, ok := l.match(a)
if !ok {
return false
}
// Fast-forward the appended log slice to the last matching entry.
// NB: a.prev.index <= match <= a.lastIndex(), so the call is safe.
return l.append(a.forward(match))
}
a = a.forward(match)

// append conditionally appends the given log slice to the log. Same as
// maybeAppend, but does not skip the already present entries.
//
// TODO(pav-kv): do a clearer distinction between maybeAppend and append. The
// append method should only append at the end of the log (and verify that it's
// the case), and maybeAppend can truncate the log.
func (l *raftLog) append(a logSlice) bool {
if len(a.entries) == 0 {
// TODO(pav-kv): remove this clause and handle it in unstable. The log slice
// can carry a newer a.term, which should update our accTerm.
return true
}
if first := a.entries[0].Index; first <= l.committed {
l.logger.Panicf("entry %d is already committed [committed(%d)]", first, l.committed)
}
// TODO(pav-kv): pass the logSlice down the stack, for safety checks and
// bookkeeping in the unstable structure.
l.unstable.truncateAndAppend(a.entries)
return true
return l.unstable.truncateAndAppend(a)
}

// append adds the given log slice to the end of the log.
//
// Returns false if the operation can not be done: entry a.prev does not match
// the lastEntryID of this log, or a.term is outdated.
func (l *raftLog) append(a logSlice) bool {
return l.unstable.append(a)
}

// match finds the longest prefix of the given log slice that matches the log.
Expand Down Expand Up @@ -285,7 +309,7 @@ func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool {
func (l *raftLog) maxAppliableIndex(allowUnstable bool) uint64 {
hi := l.committed
if !allowUnstable {
hi = min(hi, l.unstable.offset-1)
hi = min(hi, l.unstable.prev.index)
}
return hi
}
Expand Down Expand Up @@ -327,14 +351,7 @@ func (l *raftLog) firstIndex() uint64 {
}

func (l *raftLog) lastIndex() uint64 {
if i, ok := l.unstable.maybeLastIndex(); ok {
return i
}
i, err := l.storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
return i
return l.unstable.lastIndex()
}

// commitTo bumps the commit index to the given value if it is higher than the
Expand Down Expand Up @@ -478,11 +495,14 @@ func (l *raftLog) matchTerm(id entryID) bool {
return t == id.term
}

func (l *raftLog) restore(s snapshot) {
func (l *raftLog) restore(s snapshot) bool {
id := s.lastEntryID()
l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, id.index, id.term)
l.unstable.restore(s)
if !l.unstable.restore(s) {
return false
}
l.committed = id.index
return true
}

// scan visits all log entries in the [lo, hi) range, returning them via the
Expand Down Expand Up @@ -513,20 +533,21 @@ func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.En

// slice returns a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
// TODO(pav-kv): simplify a bunch of arithmetics below.
if err := l.mustCheckOutOfBounds(lo, hi); err != nil {
return nil, err
}
if lo == hi {
return nil, nil
}
if lo >= l.unstable.offset {
if lo > l.unstable.prev.index {
ents := limitSize(l.unstable.slice(lo, hi), maxSize)
// NB: use the full slice expression to protect the unstable slice from
// appends to the returned ents slice.
return ents[:len(ents):len(ents)], nil
}

cut := min(hi, l.unstable.offset)
cut := min(hi, l.unstable.prev.index+1)
ents, err := l.storage.Entries(lo, cut, uint64(maxSize))
if err == ErrCompacted {
return nil, err
Expand All @@ -535,7 +556,7 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e
} else if err != nil {
panic(err) // TODO(pavelkalinnikov): handle errors uniformly
}
if hi <= l.unstable.offset {
if hi <= l.unstable.prev.index+1 {
return ents, nil
}

Expand All @@ -552,7 +573,7 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e
return ents, nil
}

unstable := limitSize(l.unstable.slice(l.unstable.offset, hi), maxSize-size)
unstable := limitSize(l.unstable.slice(l.unstable.prev.index+1, hi), maxSize-size)
// Total size of unstable may exceed maxSize-size only if len(unstable) == 1.
// If this happens, ignore this extra entry.
if len(unstable) == 1 && size+entsSize(unstable) > maxSize {
Expand Down
73 changes: 33 additions & 40 deletions pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,47 +153,40 @@ func TestIsUpToDate(t *testing.T) {

func TestAppend(t *testing.T) {
init := entryID{}.append(1, 2, 2)
commit := logMark{term: init.term, index: 1}
for _, tt := range []struct {
app logSlice
want logSlice
panic bool
notOk bool
}{
{app: logSlice{}, want: init},
// appends not at the end of the log
{app: logSlice{}, notOk: true},
{app: entryID{term: 1, index: 1}.append(3), notOk: true},
{app: entryID{term: 2, index: 4}.append(3), notOk: true},
// appends at the end of the log
{
app: entryID{term: 2, index: 3}.append(2),
want: entryID{}.append(1, 2, 2, 2),
}, {
app: entryID{term: 1, index: 1}.append(3),
want: entryID{}.append(1, 3), // overwrite from index 2
}, {
app: entryID{term: 2, index: 2}.append(3, 4, 5),
want: entryID{}.append(1, 2, 3, 4, 5), // overwrite from index 3
}, {
app: entryID{}.append(3, 4),
panic: true, // entry 1 is already committed
app: entryID{term: 2, index: 3}.append(),
want: entryID{}.append(1, 2, 2),
},
} {
t.Run("", func(t *testing.T) {
defer func() {
if r := recover(); r != nil {
require.True(t, tt.panic)
}
}()

storage := NewMemoryStorage()
require.NoError(t, storage.SetHardState(pb.HardState{Term: init.term}))
require.NoError(t, storage.Append(init.entries))
raftLog := newLog(storage, discardLogger)
raftLog.commitTo(commit)

require.True(t, raftLog.append(tt.app))
require.False(t, tt.panic)
require.Equal(t, !tt.notOk, raftLog.append(tt.app))
if tt.notOk {
require.Equal(t, init.entries, raftLog.allEntries())
return
}
// TODO(pav-kv): check the term and prev too.
require.Equal(t, tt.want.entries, raftLog.allEntries())
if len(tt.app.entries) != 0 {
require.Equal(t, tt.app.lastIndex(), raftLog.lastIndex())
require.Equal(t, tt.app.prev.index+1, raftLog.unstable.offset)
require.Equal(t, tt.app.prev, raftLog.unstable.prev)
}
})
}
Expand Down Expand Up @@ -339,19 +332,19 @@ func TestHasNextCommittedEnts(t *testing.T) {
t.Run("", func(t *testing.T) {
storage := NewMemoryStorage()
require.NoError(t, storage.ApplySnapshot(snap.snap))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog := newLog(storage, discardLogger)
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
if tt.snap {
newSnap := snap
newSnap.snap.Metadata.Index++
raftLog.restore(newSnap)
newSnap.snap.Metadata.Index = init.lastIndex() + 1
require.True(t, raftLog.restore(newSnap))
}
require.Equal(t, tt.whasNext, raftLog.hasNextCommittedEnts(tt.allowUnstable))
})
Expand Down Expand Up @@ -393,19 +386,19 @@ func TestNextCommittedEnts(t *testing.T) {
t.Run("", func(t *testing.T) {
storage := NewMemoryStorage()
require.NoError(t, storage.ApplySnapshot(snap.snap))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog := newLog(storage, discardLogger)
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
if tt.snap {
newSnap := snap
newSnap.snap.Metadata.Index++
raftLog.restore(newSnap)
newSnap.snap.Metadata.Index = init.lastIndex() + 1
require.True(t, raftLog.restore(newSnap))
}
require.Equal(t, tt.wents, raftLog.nextCommittedEnts(tt.allowUnstable))
})
Expand Down Expand Up @@ -447,10 +440,10 @@ func TestAcceptApplying(t *testing.T) {
t.Run("", func(t *testing.T) {
storage := NewMemoryStorage()
require.NoError(t, storage.ApplySnapshot(snap))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog := newLogWithSize(storage, discardLogger, maxSize)
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.appliedTo(3, 0 /* size */)
Expand Down Expand Up @@ -491,10 +484,10 @@ func TestAppliedTo(t *testing.T) {
t.Run("", func(t *testing.T) {
storage := NewMemoryStorage()
require.NoError(t, storage.ApplySnapshot(snap))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog := newLogWithSize(storage, discardLogger, maxSize)
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.appliedTo(3, 0 /* size */)
Expand Down Expand Up @@ -527,14 +520,14 @@ func TestNextUnstableEnts(t *testing.T) {
// append unstable entries to raftlog
raftLog := newLog(storage, discardLogger)
require.True(t, raftLog.append(tt))
require.Equal(t, tt.prev.index+1, raftLog.unstable.offset)
require.Equal(t, tt.prev, raftLog.unstable.prev)

require.Equal(t, len(tt.entries) != 0, raftLog.hasNextUnstableEnts())
require.Equal(t, tt.entries, raftLog.nextUnstableEnts())
if len(tt.entries) != 0 {
raftLog.stableTo(tt.lastEntryID())
}
require.Equal(t, tt.lastIndex()+1, raftLog.unstable.offset)
require.Equal(t, tt.lastEntryID(), raftLog.unstable.prev)
})
}
}
Expand Down Expand Up @@ -583,7 +576,7 @@ func TestStableTo(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), discardLogger)
require.True(t, raftLog.append(init))
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
require.Equal(t, tt.wunstable, raftLog.unstable.prev.index+1)
})
}
}
Expand Down Expand Up @@ -619,7 +612,7 @@ func TestStableToWithSnap(t *testing.T) {
raftLog := newLog(s, discardLogger)
require.True(t, raftLog.append(tt.sl))
raftLog.stableTo(tt.to)
require.Equal(t, tt.want, raftLog.unstable.offset)
require.Equal(t, tt.want, raftLog.unstable.prev.index+1)
})
}
}
Expand Down Expand Up @@ -674,7 +667,7 @@ func TestLogRestore(t *testing.T) {
require.Zero(t, len(raftLog.allEntries()))
require.Equal(t, index+1, raftLog.firstIndex())
require.Equal(t, index, raftLog.committed)
require.Equal(t, index+1, raftLog.unstable.offset)
require.Equal(t, index, raftLog.unstable.prev.index)
require.Equal(t, term, mustTerm(raftLog.term(index)))
}

Expand Down Expand Up @@ -783,10 +776,10 @@ func TestTermWithUnstableSnapshot(t *testing.T) {
storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: storagesnapi, Term: 1}})
l := newLog(storage, discardLogger)
l.restore(snapshot{
require.True(t, l.restore(snapshot{
term: 1,
snap: pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}},
})
}))

for _, tt := range []struct {
idx uint64
Expand Down
Loading

0 comments on commit 0179c39

Please sign in to comment.