diff --git a/pkg/raft/bootstrap.go b/pkg/raft/bootstrap.go index bad2a0373d63..7bca1a480f5e 100644 --- a/pkg/raft/bootstrap.go +++ b/pkg/raft/bootstrap.go @@ -37,9 +37,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { lastIndex, err := rn.raft.raftLog.storage.LastIndex() if err != nil { return err - } - - if lastIndex != 0 { + } else if lastIndex != 0 { return errors.New("can't bootstrap a nonempty Storage") } diff --git a/pkg/raft/log.go b/pkg/raft/log.go index aa0c5695bab0..654e2db6df88 100644 --- a/pkg/raft/log.go +++ b/pkg/raft/log.go @@ -84,9 +84,14 @@ func newLogWithSize( if err != nil { panic(err) // TODO(bdarnell) } + lastTerm, err := storage.Term(lastIndex) + if err != nil { + panic(err) // TODO(pav-kv) + } + last := entryID{term: lastTerm, index: lastIndex} return &raftLog{ storage: storage, - unstable: newUnstable(lastIndex+1, logger), + unstable: newUnstable(last, logger), maxApplyingEntsSize: maxApplyingEntsSize, // Initialize our committed and applied pointers to the time of the last compaction. @@ -99,8 +104,27 @@ func newLogWithSize( } func (l *raftLog) String() string { + // TODO(pav-kv): clean-up this message. It will change all the datadriven + // tests, so do it in a contained PR. return fmt.Sprintf("committed=%d, applied=%d, applying=%d, unstable.offset=%d, unstable.offsetInProgress=%d, len(unstable.Entries)=%d", - l.committed, l.applied, l.applying, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries)) + l.committed, l.applied, l.applying, l.unstable.prev.index+1, l.unstable.entryInProgress+1, len(l.unstable.entries)) +} + +// accTerm returns the term of the leader whose append was accepted into the log +// last. Note that a rejected append does not update accTerm, by definition. +// +// Invariant: the log is a prefix of the accTerm's leader log +// Invariant: lastEntryID().term <= accTerm <= raft.Term +// +// In steady state, accTerm == raft.Term. When someone campaigns, raft.Term +// briefly overtakes the accTerm. However, accTerm catches up as soon as we +// accept an append from the new leader. +// +// NB: the log can be partially or fully compacted. When we say "log" above, we +// logically include all the entries that were the pre-image of a snapshot, as +// well as the entries that are still physically in the log. +func (l *raftLog) accTerm() uint64 { + return l.unstable.term } // maybeAppend conditionally appends the given log slice to the log, making it @@ -113,7 +137,8 @@ func (l *raftLog) String() string { // // Returns false if the operation can not be done: entry a.prev does not match // the log (so this log slice is insufficient to make our log consistent with -// the leader log), or is out of bounds (appending it would introduce a gap). +// the leader log), the slice is out of bounds (appending it would introduce a +// gap), or a.term is outdated. func (l *raftLog) maybeAppend(a logSlice) bool { match, ok := l.match(a) if !ok { @@ -121,26 +146,25 @@ func (l *raftLog) maybeAppend(a logSlice) bool { } // Fast-forward the appended log slice to the last matching entry. // NB: a.prev.index <= match <= a.lastIndex(), so the call is safe. - return l.append(a.forward(match)) -} + a = a.forward(match) -// append conditionally appends the given log slice to the log. Same as -// maybeAppend, but does not skip the already present entries. -// -// TODO(pav-kv): do a clearer distinction between maybeAppend and append. The -// append method should only append at the end of the log (and verify that it's -// the case), and maybeAppend can truncate the log. -func (l *raftLog) append(a logSlice) bool { if len(a.entries) == 0 { + // TODO(pav-kv): remove this clause and handle it in unstable. The log slice + // can carry a newer a.term, which should update our accTerm. return true } if first := a.entries[0].Index; first <= l.committed { l.logger.Panicf("entry %d is already committed [committed(%d)]", first, l.committed) } - // TODO(pav-kv): pass the logSlice down the stack, for safety checks and - // bookkeeping in the unstable structure. - l.unstable.truncateAndAppend(a.entries) - return true + return l.unstable.truncateAndAppend(a) +} + +// append adds the given log slice to the end of the log. +// +// Returns false if the operation can not be done: entry a.prev does not match +// the lastEntryID of this log, or a.term is outdated. +func (l *raftLog) append(a logSlice) bool { + return l.unstable.append(a) } // match finds the longest prefix of the given log slice that matches the log. @@ -285,7 +309,7 @@ func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool { func (l *raftLog) maxAppliableIndex(allowUnstable bool) uint64 { hi := l.committed if !allowUnstable { - hi = min(hi, l.unstable.offset-1) + hi = min(hi, l.unstable.prev.index) } return hi } @@ -327,14 +351,7 @@ func (l *raftLog) firstIndex() uint64 { } func (l *raftLog) lastIndex() uint64 { - if i, ok := l.unstable.maybeLastIndex(); ok { - return i - } - i, err := l.storage.LastIndex() - if err != nil { - panic(err) // TODO(bdarnell) - } - return i + return l.unstable.lastIndex() } // commitTo bumps the commit index to the given value if it is higher than the @@ -478,11 +495,14 @@ func (l *raftLog) matchTerm(id entryID) bool { return t == id.term } -func (l *raftLog) restore(s snapshot) { +func (l *raftLog) restore(s snapshot) bool { id := s.lastEntryID() l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, id.index, id.term) - l.unstable.restore(s) + if !l.unstable.restore(s) { + return false + } l.committed = id.index + return true } // scan visits all log entries in the [lo, hi) range, returning them via the @@ -513,20 +533,21 @@ func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.En // slice returns a slice of log entries from lo through hi-1, inclusive. func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { + // TODO(pav-kv): simplify a bunch of arithmetics below. if err := l.mustCheckOutOfBounds(lo, hi); err != nil { return nil, err } if lo == hi { return nil, nil } - if lo >= l.unstable.offset { + if lo > l.unstable.prev.index { ents := limitSize(l.unstable.slice(lo, hi), maxSize) // NB: use the full slice expression to protect the unstable slice from // appends to the returned ents slice. return ents[:len(ents):len(ents)], nil } - cut := min(hi, l.unstable.offset) + cut := min(hi, l.unstable.prev.index+1) ents, err := l.storage.Entries(lo, cut, uint64(maxSize)) if err == ErrCompacted { return nil, err @@ -535,7 +556,7 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e } else if err != nil { panic(err) // TODO(pavelkalinnikov): handle errors uniformly } - if hi <= l.unstable.offset { + if hi <= l.unstable.prev.index+1 { return ents, nil } @@ -552,7 +573,7 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e return ents, nil } - unstable := limitSize(l.unstable.slice(l.unstable.offset, hi), maxSize-size) + unstable := limitSize(l.unstable.slice(l.unstable.prev.index+1, hi), maxSize-size) // Total size of unstable may exceed maxSize-size only if len(unstable) == 1. // If this happens, ignore this extra entry. if len(unstable) == 1 && size+entsSize(unstable) > maxSize { diff --git a/pkg/raft/log_test.go b/pkg/raft/log_test.go index 7e2800bddce2..616687bf22dd 100644 --- a/pkg/raft/log_test.go +++ b/pkg/raft/log_test.go @@ -153,47 +153,40 @@ func TestIsUpToDate(t *testing.T) { func TestAppend(t *testing.T) { init := entryID{}.append(1, 2, 2) - commit := logMark{term: init.term, index: 1} for _, tt := range []struct { app logSlice want logSlice - panic bool + notOk bool }{ - {app: logSlice{}, want: init}, + // appends not at the end of the log + {app: logSlice{}, notOk: true}, + {app: entryID{term: 1, index: 1}.append(3), notOk: true}, + {app: entryID{term: 2, index: 4}.append(3), notOk: true}, + // appends at the end of the log { app: entryID{term: 2, index: 3}.append(2), want: entryID{}.append(1, 2, 2, 2), }, { - app: entryID{term: 1, index: 1}.append(3), - want: entryID{}.append(1, 3), // overwrite from index 2 - }, { - app: entryID{term: 2, index: 2}.append(3, 4, 5), - want: entryID{}.append(1, 2, 3, 4, 5), // overwrite from index 3 - }, { - app: entryID{}.append(3, 4), - panic: true, // entry 1 is already committed + app: entryID{term: 2, index: 3}.append(), + want: entryID{}.append(1, 2, 2), }, } { t.Run("", func(t *testing.T) { - defer func() { - if r := recover(); r != nil { - require.True(t, tt.panic) - } - }() - storage := NewMemoryStorage() require.NoError(t, storage.SetHardState(pb.HardState{Term: init.term})) require.NoError(t, storage.Append(init.entries)) raftLog := newLog(storage, discardLogger) - raftLog.commitTo(commit) - require.True(t, raftLog.append(tt.app)) - require.False(t, tt.panic) + require.Equal(t, !tt.notOk, raftLog.append(tt.app)) + if tt.notOk { + require.Equal(t, init.entries, raftLog.allEntries()) + return + } // TODO(pav-kv): check the term and prev too. require.Equal(t, tt.want.entries, raftLog.allEntries()) if len(tt.app.entries) != 0 { require.Equal(t, tt.app.lastIndex(), raftLog.lastIndex()) - require.Equal(t, tt.app.prev.index+1, raftLog.unstable.offset) + require.Equal(t, tt.app.prev, raftLog.unstable.prev) } }) } @@ -339,10 +332,10 @@ func TestHasNextCommittedEnts(t *testing.T) { t.Run("", func(t *testing.T) { storage := NewMemoryStorage() require.NoError(t, storage.ApplySnapshot(snap.snap)) - require.NoError(t, storage.Append(init.entries[:1])) - raftLog := newLog(storage, discardLogger) require.True(t, raftLog.append(init)) + require.NoError(t, storage.Append(init.entries[:1])) + raftLog.stableTo(entryID{term: 1, index: 4}) raftLog.commitTo(logMark{term: init.term, index: 5}) raftLog.appliedTo(tt.applied, 0 /* size */) @@ -350,8 +343,8 @@ func TestHasNextCommittedEnts(t *testing.T) { raftLog.applyingEntsPaused = tt.paused if tt.snap { newSnap := snap - newSnap.snap.Metadata.Index++ - raftLog.restore(newSnap) + newSnap.snap.Metadata.Index = init.lastIndex() + 1 + require.True(t, raftLog.restore(newSnap)) } require.Equal(t, tt.whasNext, raftLog.hasNextCommittedEnts(tt.allowUnstable)) }) @@ -393,10 +386,10 @@ func TestNextCommittedEnts(t *testing.T) { t.Run("", func(t *testing.T) { storage := NewMemoryStorage() require.NoError(t, storage.ApplySnapshot(snap.snap)) - require.NoError(t, storage.Append(init.entries[:1])) - raftLog := newLog(storage, discardLogger) require.True(t, raftLog.append(init)) + require.NoError(t, storage.Append(init.entries[:1])) + raftLog.stableTo(entryID{term: 1, index: 4}) raftLog.commitTo(logMark{term: init.term, index: 5}) raftLog.appliedTo(tt.applied, 0 /* size */) @@ -404,8 +397,8 @@ func TestNextCommittedEnts(t *testing.T) { raftLog.applyingEntsPaused = tt.paused if tt.snap { newSnap := snap - newSnap.snap.Metadata.Index++ - raftLog.restore(newSnap) + newSnap.snap.Metadata.Index = init.lastIndex() + 1 + require.True(t, raftLog.restore(newSnap)) } require.Equal(t, tt.wents, raftLog.nextCommittedEnts(tt.allowUnstable)) }) @@ -447,10 +440,10 @@ func TestAcceptApplying(t *testing.T) { t.Run("", func(t *testing.T) { storage := NewMemoryStorage() require.NoError(t, storage.ApplySnapshot(snap)) - require.NoError(t, storage.Append(init.entries[:1])) - raftLog := newLogWithSize(storage, discardLogger, maxSize) require.True(t, raftLog.append(init)) + require.NoError(t, storage.Append(init.entries[:1])) + raftLog.stableTo(entryID{term: 1, index: 4}) raftLog.commitTo(logMark{term: init.term, index: 5}) raftLog.appliedTo(3, 0 /* size */) @@ -491,10 +484,10 @@ func TestAppliedTo(t *testing.T) { t.Run("", func(t *testing.T) { storage := NewMemoryStorage() require.NoError(t, storage.ApplySnapshot(snap)) - require.NoError(t, storage.Append(init.entries[:1])) - raftLog := newLogWithSize(storage, discardLogger, maxSize) require.True(t, raftLog.append(init)) + require.NoError(t, storage.Append(init.entries[:1])) + raftLog.stableTo(entryID{term: 1, index: 4}) raftLog.commitTo(logMark{term: init.term, index: 5}) raftLog.appliedTo(3, 0 /* size */) @@ -527,14 +520,14 @@ func TestNextUnstableEnts(t *testing.T) { // append unstable entries to raftlog raftLog := newLog(storage, discardLogger) require.True(t, raftLog.append(tt)) - require.Equal(t, tt.prev.index+1, raftLog.unstable.offset) + require.Equal(t, tt.prev, raftLog.unstable.prev) require.Equal(t, len(tt.entries) != 0, raftLog.hasNextUnstableEnts()) require.Equal(t, tt.entries, raftLog.nextUnstableEnts()) if len(tt.entries) != 0 { raftLog.stableTo(tt.lastEntryID()) } - require.Equal(t, tt.lastIndex()+1, raftLog.unstable.offset) + require.Equal(t, tt.lastEntryID(), raftLog.unstable.prev) }) } } @@ -583,7 +576,7 @@ func TestStableTo(t *testing.T) { raftLog := newLog(NewMemoryStorage(), discardLogger) require.True(t, raftLog.append(init)) raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei}) - require.Equal(t, tt.wunstable, raftLog.unstable.offset) + require.Equal(t, tt.wunstable, raftLog.unstable.prev.index+1) }) } } @@ -619,7 +612,7 @@ func TestStableToWithSnap(t *testing.T) { raftLog := newLog(s, discardLogger) require.True(t, raftLog.append(tt.sl)) raftLog.stableTo(tt.to) - require.Equal(t, tt.want, raftLog.unstable.offset) + require.Equal(t, tt.want, raftLog.unstable.prev.index+1) }) } } @@ -674,7 +667,7 @@ func TestLogRestore(t *testing.T) { require.Zero(t, len(raftLog.allEntries())) require.Equal(t, index+1, raftLog.firstIndex()) require.Equal(t, index, raftLog.committed) - require.Equal(t, index+1, raftLog.unstable.offset) + require.Equal(t, index, raftLog.unstable.prev.index) require.Equal(t, term, mustTerm(raftLog.term(index))) } @@ -783,10 +776,10 @@ func TestTermWithUnstableSnapshot(t *testing.T) { storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: storagesnapi, Term: 1}}) l := newLog(storage, discardLogger) - l.restore(snapshot{ + require.True(t, l.restore(snapshot{ term: 1, snap: pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}}, - }) + })) for _, tt := range []struct { idx uint64 diff --git a/pkg/raft/log_unstable.go b/pkg/raft/log_unstable.go index ccf3e8ad7cfa..9e8d36f84505 100644 --- a/pkg/raft/log_unstable.go +++ b/pkg/raft/log_unstable.go @@ -19,60 +19,106 @@ package raft import pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" -// unstable contains "unstable" log entries and snapshot state that has -// not yet been written to Storage. The type serves two roles. First, it -// holds on to new log entries and an optional snapshot until they are -// handed to a Ready struct for persistence. Second, it continues to -// hold on to this state after it has been handed off to provide raftLog -// with a view of the in-progress log entries and snapshot until their -// writes have been stabilized and are guaranteed to be reflected in -// queries of Storage. After this point, the corresponding log entries -// and/or snapshot can be cleared from unstable. +// unstable is a suffix of the raft log pending to be written to Storage. The +// "log" can be represented by a snapshot, and/or a contiguous slice of entries. // -// unstable.entries[i] has raft log position i+unstable.offset. -// Note that unstable.offset may be less than the highest log -// position in storage; this means that the next write to storage -// might need to truncate the log before persisting unstable.entries. +// The possible states: +// 1. Both the snapshot and the entries logSlice are empty. This means the log +// is fully in Storage. The logSlice.prev is the lastEntryID of the log. +// 2. The snapshot is empty, and the logSlice is non-empty. The state up to +// (including) logSlice.prev is in Storage, and the logSlice is pending. +// 3. The snapshot is non-empty, and the logSlice is empty. The snapshot +// overrides the entire log in Storage. +// 4. Both the snapshot and logSlice are non-empty. The snapshot immediately +// precedes the entries, i.e. logSlice.prev == snapshot.lastEntryID. This +// state overrides the entire log in Storage. +// +// The type serves two roles. First, it holds on to the latest snapshot / log +// entries until they are handed to storage for persistence (via Ready API) and +// subsequently acknowledged. This provides the RawNode with a consistent view +// on the latest state of the log: the raftLog struct combines a log prefix from +// Storage with the suffix held by unstable. +// +// Second, it supports the asynchronous log writes protocol. The snapshot and +// the entries are handed to storage in the order that guarantees consistency of +// the raftLog. Writes on the storage happen in the same order, and issue +// acknowledgements delivered back to unstable. On acknowledgement, the snapshot +// and the entries are released from memory when it is safe to do so. There is +// no strict requirement on the order of acknowledgement delivery. +// +// TODO(pav-kv): describe the order requirements in more detail when accTerm +// (logSlice.term) is integrated into the protocol. +// +// Note that the in-memory prefix of the log can contain entries at indices less +// than Storage.LastIndex(). This means that the next write to storage might +// need to truncate the log before persisting the new suffix. Such a situation +// happens when there is a leader change, and the new leader overwrites entries +// that haven't been committed yet. +// +// TODO(pav-kv): decouple the "in progress" part into a separate struct which +// drives the storage write protocol. type unstable struct { - // the incoming unstable snapshot, if any. + // snapshot is the pending unstable snapshot, if any. // - // Invariants: - // - snapshot == nil ==> !snapshotInProgress - // - snapshot != nil ==> offset == snapshot.Metadata.Index + 1 + // Invariant: snapshot == nil ==> !snapshotInProgress + // Invariant: snapshot != nil ==> snapshot.lastEntryID == logSlice.prev // // The last invariant enforces the order of handling a situation when there is - // both a snapshot and entries. The snapshot must be acknowledged first, - // before entries are acknowledged and offset moves forward. + // both a snapshot and entries. The snapshot write must be acknowledged first, + // before entries are acknowledged and the logSlice moves forward. snapshot *pb.Snapshot - // all entries that have not yet been written to storage. - entries []pb.Entry - // entries[i] has raft log position i+offset. - offset uint64 - // if true, snapshot is being written to storage. + // logSlice is the suffix of the raft log that is not yet written to storage. + // If all the entries are written, or covered by the pending snapshot, then + // logSlice.entries is empty. + // + // Invariant: snapshot != nil ==> logSlice.prev == snapshot.lastEntryID + // Invariant: snapshot == nil ==> logSlice.prev is in Storage + // Invariant: logSlice.lastEntryID() is the end of the log at all times + // + // Invariant: logSlice.term, a.k.a. the "last accepted term", is the term of + // the leader whose append (either entries or snapshot) we accepted last. Our + // state is consistent with the leader log at this term. + logSlice + + // snapshotInProgress is true if the snapshot is being written to storage. // // Invariant: snapshotInProgress ==> snapshot != nil snapshotInProgress bool - // entries[:offsetInProgress-offset] are being written to storage. - // Like offset, offsetInProgress is exclusive, meaning that it - // contains the index following the largest in-progress entry. + // entryInProgress is the index of the last entry in logSlice already present + // in, or being written to storage. // - // Invariant: offset <= offsetInProgress - // Invariant: offsetInProgress - offset <= len(entries) - // Invariant: offsetInProgress > offset ==> snapshot == nil || snapshotInProgress + // Invariant: prev.index <= entryInProgress <= lastIndex() + // Invariant: entryInProgress > prev.index && snapshot != nil ==> snapshotInProgress // // The last invariant enforces the order of handling a situation when there is - // both a snapshot and entries. The snapshot must be sent to storage first. - offsetInProgress uint64 + // both a snapshot and entries. The snapshot must be sent to storage first, or + // together with the entries. + entryInProgress uint64 logger Logger } -func newUnstable(offset uint64, logger Logger) unstable { +func newUnstable(last entryID, logger Logger) unstable { + // To initialize the last accepted term (logSlice.term) correctly, we make + // sure its invariant is true: the log is a prefix of the term's leader's log. + // This can be achieved by conservatively initializing to the term of the last + // log entry. + // + // We can't pick any lower term because the lower term's leader can't have + // last.term entries in it. We can't pick a higher term because we don't have + // any information about the higher-term leaders and their logs. So last.term + // is the only valid choice. + // + // TODO(pav-kv): persist the accepted term in HardState and load it. Our + // initialization is conservative. Before restart, the accepted term could + // have been higher. Setting a higher term (ideally, matching the current + // leader Term) gives us more information about the log, and then allows + // bumping its commit index sooner than when the next MsgApp arrives. return unstable{ - offset: offset, - offsetInProgress: offset, - logger: logger, + logSlice: logSlice{term: last.term, prev: last}, + entryInProgress: last.index, + logger: logger, } } @@ -85,47 +131,21 @@ func (u *unstable) maybeFirstIndex() (uint64, bool) { return 0, false } -// maybeLastIndex returns the last index if it has at least one -// unstable entry or snapshot. -func (u *unstable) maybeLastIndex() (uint64, bool) { - if l := len(u.entries); l != 0 { - return u.offset + uint64(l) - 1, true - } - if u.snapshot != nil { - return u.snapshot.Metadata.Index, true - } - return 0, false -} - -// maybeTerm returns the term of the entry at index i, if there -// is any. +// maybeTerm returns the term of the entry at index i, if there is any. func (u *unstable) maybeTerm(i uint64) (uint64, bool) { - if i < u.offset { - if u.snapshot != nil && u.snapshot.Metadata.Index == i { - return u.snapshot.Metadata.Term, true - } - return 0, false - } - - last, ok := u.maybeLastIndex() - if !ok { - return 0, false - } - if i > last { + if i < u.prev.index || i > u.lastIndex() { return 0, false } - - return u.entries[i-u.offset].Term, true + return u.termAt(i), true } // nextEntries returns the unstable entries that are not already in the process // of being written to storage. func (u *unstable) nextEntries() []pb.Entry { - inProgress := int(u.offsetInProgress - u.offset) - if len(u.entries) == inProgress { + if u.entryInProgress == u.lastIndex() { return nil } - return u.entries[inProgress:] + return u.entries[u.entryInProgress-u.prev.index:] } // nextSnapshot returns the unstable snapshot, if one exists that is not already @@ -143,13 +163,8 @@ func (u *unstable) nextSnapshot() *pb.Snapshot { // entries/snapshots added after a call to acceptInProgress will be returned // from those methods, until the next call to acceptInProgress. func (u *unstable) acceptInProgress() { - if u.snapshot != nil { - u.snapshotInProgress = true - } - if len(u.entries) > 0 { - // NOTE: +1 because offsetInProgress is exclusive, like offset. - u.offsetInProgress = u.entries[len(u.entries)-1].Index + 1 - } + u.snapshotInProgress = u.snapshot != nil + u.entryInProgress = u.lastIndex() } // stableTo marks entries up to the entry with the specified (index, term) as @@ -159,34 +174,34 @@ func (u *unstable) acceptInProgress() { // can not be overwritten by an in-progress log append. See the related comment // in newStorageAppendRespMsg. func (u *unstable) stableTo(id entryID) { - gt, ok := u.maybeTerm(id.index) - if !ok { - // Unstable entry missing. Ignore. - u.logger.Infof("entry at index %d missing from unstable log; ignoring", id.index) - return - } - if id.index < u.offset { + if u.snapshot != nil && id.index == u.snapshot.Metadata.Index { // Index matched unstable snapshot, not unstable entry. Ignore. u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", id.index) return } - if gt != id.term { + if id.index <= u.prev.index || id.index > u.lastIndex() { + // Unstable entry missing. Ignore. + u.logger.Infof("entry at index %d missing from unstable log; ignoring", id.index) + return + } + if term := u.termAt(id.index); term != id.term { // Term mismatch between unstable entry and specified entry. Ignore. // This is possible if part or all of the unstable log was replaced // between that time that a set of entries started to be written to // stable storage and when they finished. u.logger.Infof("entry at (index,term)=(%d,%d) mismatched with "+ - "entry at (%d,%d) in unstable log; ignoring", id.index, id.term, id.index, gt) + "entry at (%d,%d) in unstable log; ignoring", id.index, id.term, id.index, term) return } if u.snapshot != nil { u.logger.Panicf("entry %+v acked earlier than the snapshot(in-progress=%t): %s", id, u.snapshotInProgress, DescribeSnapshot(*u.snapshot)) } - num := int(id.index + 1 - u.offset) - u.entries = u.entries[num:] - u.offset = id.index + 1 - u.offsetInProgress = max(u.offsetInProgress, u.offset) + u.logSlice = u.forward(id.index) + // TODO(pav-kv): why can id.index overtake u.entryInProgress? Probably bugs in + // tests using the log writes incorrectly, e.g. TestLeaderStartReplication + // takes nextUnstableEnts() without acceptInProgress(). + u.entryInProgress = max(u.entryInProgress, id.index) u.shrinkEntriesArray() } @@ -211,56 +226,109 @@ func (u *unstable) shrinkEntriesArray() { func (u *unstable) stableSnapTo(i uint64) { if u.snapshot != nil && u.snapshot.Metadata.Index == i { - u.snapshot = nil u.snapshotInProgress = false + u.snapshot = nil } } -func (u *unstable) restore(s snapshot) { - // TODO(pav-kv): add a safety check making sure the snapshot does not regress - // the logMark of unstable. The ("accepted term", lastIndex) is the logical - // clock that must not regress. - u.offset = s.lastIndex() + 1 - u.offsetInProgress = u.offset - u.entries = nil +// restore resets the state to the given snapshot. It effectively truncates the +// log after s.lastEntryID(), so the caller must ensure this is safe. +func (u *unstable) restore(s snapshot) bool { + // All logs >= s.term are consistent with the log covered by the snapshot. If + // our log is such, disallow restoring a snapshot at indices below our last + // index, to avoid truncating a meaningful suffix of the log and losing its + // durability which could have been used to commit entries in this suffix. + // + // In this case, the caller must have advanced the commit index instead. + // Alternatively, we could retain a suffix of the log instead of truncating + // it, but at the time of writing the whole stack (including storage) is + // written such that a snapshot always clears the log. + if s.term <= u.term && s.lastIndex() < u.lastIndex() { + return false + } + // If s.term <= u.term, our log is consistent with the snapshot. Set the new + // accepted term to the max of the two so that u.term does not regress. At the + // time of writing, s.term is always >= u.term, because s.term == raft.Term + // and u.term <= raft.Term. It could be relaxed in the future. + term := max(u.term, s.term) + u.snapshot = &s.snap + u.logSlice = logSlice{term: term, prev: s.lastEntryID()} u.snapshotInProgress = false + u.entryInProgress = u.prev.index + return true +} + +// append adds the given log slice to the end of the log. Returns false if this +// can not be done. +func (u *unstable) append(a logSlice) bool { + if a.term < u.term { + return false // append from an outdated log + } else if a.prev != u.lastEntryID() { + return false // not a valid append at the end of the log + } + u.term = a.term // update the last accepted term + u.entries = append(u.entries, a.entries...) + return true } -func (u *unstable) truncateAndAppend(ents []pb.Entry) { - fromIndex := ents[0].Index +func (u *unstable) truncateAndAppend(a logSlice) bool { + if a.term < u.term { + return false // append from an outdated log + } + // Fast path for appends at the end of the log. + last := u.lastEntryID() + if a.prev == last { + u.term = a.term // update the last accepted term + u.entries = append(u.entries, a.entries...) + return true + } + // If a.prev.index > last.index, we can not accept this write because it will + // introduce a gap in the log. + // + // If a.prev.index == last.index, then the last entry term did not match in + // the check above, so we must reject this case too. + if a.prev.index >= last.index { + return false + } + // Below, we handle the index regression case, a.prev.index < last.index. + // + // Within the same leader term, we enforce the log to be append-only, and only + // allow index regressions (which cause log truncations) when a.term > u.term. + if a.term == u.term { + return false + } - // We do not expect appends at or before the snapshot index. + // The caller checks that a.prev.index >= commit, i.e. we are not truncating + // committed entries. By extension, a.prev.index >= commit >= snapshot.index. + // So we do not expect the following check to fail. // - // The caller does stronger checks preventing appends at <= commit index, so - // the check here is redundant. But it's a defense-in-depth guarding the - // unstable struct invariant: entries begin at snapshot index + 1. The code - // below does not regress offset beyond the snapshot. - if u.snapshot != nil && fromIndex <= u.snapshot.Metadata.Index { - u.logger.Panicf("appending entry %+v before snapshot %s", - pbEntryID(&ents[0]), DescribeSnapshot(*u.snapshot)) + // It is a defense-in-depth guarding the invariant: if snapshot != nil then + // prev == snapshot.{term,index}. The code regresses prev, so we don't want + // the snapshot ID to get out of sync with it. + if u.snapshot != nil && a.prev.index < u.snapshot.Metadata.Index { + u.logger.Panicf("appending at %+v before snapshot %s", a.prev, DescribeSnapshot(*u.snapshot)) + return false } - switch { - case fromIndex == u.offset+uint64(len(u.entries)): - // fromIndex is the next index in the u.entries, so append directly. - u.entries = append(u.entries, ents...) - case fromIndex <= u.offset: - u.logger.Infof("replace the unstable entries from index %d", fromIndex) - // The log is being truncated to before our current offset - // portion, so set the offset and replace the entries. - u.entries = ents - u.offset = fromIndex - u.offsetInProgress = u.offset - default: - // Truncate to fromIndex (exclusive), and append the new entries. - u.logger.Infof("truncate the unstable entries before index %d", fromIndex) - keep := u.slice(u.offset, fromIndex) // NB: appending to this slice is safe, - u.entries = append(keep, ents...) // and will reallocate/copy it - // Only in-progress entries before fromIndex are still considered to be - // in-progress. - u.offsetInProgress = min(u.offsetInProgress, fromIndex) + // Truncate the log and append new entries. Regress the entryInProgress mark + // to reflect that the truncated entries are no longer considered in progress. + if a.prev.index <= u.prev.index { + u.logSlice = a // replace the entire logSlice with the latest append + // TODO(pav-kv): clean up the logging message. It will change all datadriven + // test outputs, so do it in a contained PR. + u.logger.Infof("replace the unstable entries from index %d", a.prev.index+1) + } else { + u.term = a.term // update the last accepted term + // Use the full slice expression to cause copy-on-write on this or a + // subsequent (if a.entries is empty) append to u.entries. The truncated + // part of the old slice can still be referenced elsewhere. + keep := u.entries[:a.prev.index-u.prev.index] + u.entries = append(keep[:len(keep):len(keep)], a.entries...) + u.logger.Infof("truncate the unstable entries before index %d", a.prev.index+1) } + u.entryInProgress = min(u.entryInProgress, a.prev.index) + return true } // slice returns the entries from the unstable log with indexes in the range @@ -276,16 +344,20 @@ func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry { // NB: use the full slice expression to limit what the caller can do with the // returned slice. For example, an append will reallocate and copy this slice // instead of corrupting the neighbouring u.entries. - return u.entries[lo-u.offset : hi-u.offset : hi-u.offset] + offset := u.prev.index + 1 + return u.entries[lo-offset : hi-offset : hi-offset] } -// u.offset <= lo <= hi <= u.offset+len(u.entries) +// mustCheckOutOfBounds checks that [lo, hi) interval is included in +// (u.prev.index, u.lastIndex()]. +// Equivalently, u.prev.index + 1 <= lo <= hi <= u.lastIndex() + 1. +// +// TODO(pav-kv): the callers check this already. Remove. func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) { if lo > hi { u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi) } - upper := u.offset + uint64(len(u.entries)) - if lo < u.offset || hi > upper { - u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper) + if last := u.lastIndex(); lo <= u.prev.index || hi > last+1 { + u.logger.Panicf("unstable.slice[%d,%d) out of bound (%d,%d]", lo, hi, u.prev.index, last) } } diff --git a/pkg/raft/log_unstable_test.go b/pkg/raft/log_unstable_test.go index fa14f10e9eb5..75784672e186 100644 --- a/pkg/raft/log_unstable_test.go +++ b/pkg/raft/log_unstable_test.go @@ -18,64 +18,68 @@ package raft import ( - "fmt" "testing" pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/stretchr/testify/require" ) +func newUnstableForTesting(ls logSlice, snap *pb.Snapshot) unstable { + return unstable{ + snapshot: snap, + logSlice: ls, + entryInProgress: ls.prev.index, + logger: discardLogger, + } +} + func (u *unstable) checkInvariants(t testing.TB) { t.Helper() - require.GreaterOrEqual(t, u.offsetInProgress, u.offset) - require.LessOrEqual(t, u.offsetInProgress-u.offset, uint64(len(u.entries))) + require.NoError(t, u.logSlice.valid()) + require.GreaterOrEqual(t, u.entryInProgress, u.prev.index) + require.LessOrEqual(t, u.entryInProgress, u.lastIndex()) if u.snapshot != nil { - require.Equal(t, u.snapshot.Metadata.Index+1, u.offset) + require.Equal(t, u.snapshot.Metadata.Term, u.prev.term) + require.Equal(t, u.snapshot.Metadata.Index, u.prev.index) } else { require.False(t, u.snapshotInProgress) } - if len(u.entries) != 0 { - require.Equal(t, u.entries[0].Index, u.offset) - } - if u.offsetInProgress > u.offset && u.snapshot != nil { + if u.entryInProgress > u.prev.index && u.snapshot != nil { require.True(t, u.snapshotInProgress) } } func TestUnstableMaybeFirstIndex(t *testing.T) { - tests := []struct { - entries []pb.Entry - offset uint64 - snap *pb.Snapshot + prev4 := entryID{term: 1, index: 4} + snap4 := &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}} + for _, tt := range []struct { + ls logSlice + snap *pb.Snapshot wok bool windex uint64 }{ // no snapshot { - index(5).terms(1), 5, nil, + prev4.append(1), nil, false, 0, }, { - []pb.Entry{}, 0, nil, + logSlice{}, nil, false, 0, }, // has snapshot { - index(5).terms(1), 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(1), snap4, true, 5, }, { - []pb.Entry{}, 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(), snap4, true, 5, }, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - u := newUnstable(tt.offset, raftLogger) - u.snapshot = tt.snap - u.entries = tt.entries + } { + t.Run("", func(t *testing.T) { + u := newUnstableForTesting(tt.ls, tt.snap) u.checkInvariants(t) index, ok := u.maybeFirstIndex() @@ -85,119 +89,93 @@ func TestUnstableMaybeFirstIndex(t *testing.T) { } } -func TestMaybeLastIndex(t *testing.T) { - tests := []struct { - entries []pb.Entry - offset uint64 - snap *pb.Snapshot - - wok bool +func TestLastIndex(t *testing.T) { + prev4 := entryID{term: 1, index: 4} + snap4 := &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}} + for _, tt := range []struct { + ls logSlice + snap *pb.Snapshot windex uint64 }{ - // last in entries - { - index(5).terms(1), 5, nil, - true, 5, - }, - { - index(5).terms(1), 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - true, 5, - }, - // last in snapshot - { - []pb.Entry{}, 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - true, 4, - }, - // empty unstable - { - []pb.Entry{}, 0, nil, - false, 0, - }, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - u := newUnstable(tt.offset, raftLogger) - u.snapshot = tt.snap - u.entries = tt.entries + {prev4.append(1), nil, 5}, // last in entries + {prev4.append(1), snap4, 5}, + {prev4.append(), snap4, 4}, // last in snapshot + {logSlice{}, nil, 0}, // empty unstable + } { + t.Run("", func(t *testing.T) { + u := newUnstableForTesting(tt.ls, tt.snap) u.checkInvariants(t) - - index, ok := u.maybeLastIndex() - require.Equal(t, tt.wok, ok) - require.Equal(t, tt.windex, index) + require.Equal(t, tt.windex, u.lastIndex()) }) } } func TestUnstableMaybeTerm(t *testing.T) { - tests := []struct { - entries []pb.Entry - offset uint64 - snap *pb.Snapshot - index uint64 + prev4 := entryID{term: 1, index: 4} + snap4 := &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}} + for _, tt := range []struct { + ls logSlice + snap *pb.Snapshot + index uint64 wok bool wterm uint64 }{ // term from entries { - index(5).terms(1), 5, nil, + prev4.append(1), nil, 5, true, 1, }, { - index(5).terms(1), 5, nil, + prev4.append(1), nil, 6, false, 0, }, { - index(5).terms(1), 5, nil, + prev4.append(1), nil, 4, - false, 0, + true, 1, }, { - index(5).terms(1), 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(1), snap4, 5, true, 1, }, { - index(5).terms(1), 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(1), snap4, 6, false, 0, }, // term from snapshot { - index(5).terms(1), 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(1), snap4, 4, true, 1, }, { - index(5).terms(1), 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(1), snap4, 3, false, 0, }, { - []pb.Entry{}, 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(), snap4, 5, false, 0, }, { - []pb.Entry{}, 5, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(), snap4, 4, true, 1, }, { - []pb.Entry{}, 0, nil, + prev4.append(), nil, 5, false, 0, }, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - u := newUnstable(tt.offset, raftLogger) - u.snapshot = tt.snap - u.entries = tt.entries + } { + t.Run("", func(t *testing.T) { + u := newUnstableForTesting(tt.ls, tt.snap) u.checkInvariants(t) term, ok := u.maybeTerm(tt.index) @@ -208,60 +186,54 @@ func TestUnstableMaybeTerm(t *testing.T) { } func TestUnstableRestore(t *testing.T) { - u := unstable{ - entries: index(5).terms(1), - offset: 5, - offsetInProgress: 6, - snapshot: &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - snapshotInProgress: true, - logger: raftLogger, - } + u := newUnstableForTesting( + entryID{term: 1, index: 4}.append(1), + &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + ) + u.snapshotInProgress = true + u.entryInProgress = 5 u.checkInvariants(t) s := snapshot{ term: 2, snap: pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 6, Term: 2}}, } - u.restore(s) + require.True(t, u.restore(s)) u.checkInvariants(t) - require.Equal(t, s.lastIndex()+1, u.offset) - require.Equal(t, s.lastIndex()+1, u.offsetInProgress) + require.Equal(t, uint64(6), u.entryInProgress) require.Zero(t, len(u.entries)) require.Equal(t, &s.snap, u.snapshot) require.False(t, u.snapshotInProgress) } func TestUnstableNextEntries(t *testing.T) { - tests := []struct { - entries []pb.Entry - offset uint64 - offsetInProgress uint64 + prev4 := entryID{term: 1, index: 4} + for _, tt := range []struct { + ls logSlice + entryInProgress uint64 wentries []pb.Entry }{ // nothing in progress { - index(5).terms(1, 1), 5, 5, + prev4.append(1, 1), 4, index(5).terms(1, 1), }, // partially in progress { - index(5).terms(1, 1), 5, 6, + prev4.append(1, 1), 5, index(6).terms(1), }, // everything in progress { - index(5).terms(1, 1), 5, 7, + prev4.append(1, 1), 6, nil, // nil, not empty slice }, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - u := newUnstable(tt.offset, raftLogger) - u.entries = tt.entries - u.offsetInProgress = tt.offsetInProgress + } { + t.Run("", func(t *testing.T) { + u := newUnstableForTesting(tt.ls, nil /* snap */) + u.entryInProgress = tt.entryInProgress u.checkInvariants(t) require.Equal(t, tt.wentries, u.nextEntries()) }) @@ -269,35 +241,33 @@ func TestUnstableNextEntries(t *testing.T) { } func TestUnstableNextSnapshot(t *testing.T) { - s := &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}} - tests := []struct { - offset uint64 - snapshot *pb.Snapshot + prev4 := entryID{term: 1, index: 4} + snap4 := &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}} + for _, tt := range []struct { + prev entryID + snap *pb.Snapshot snapshotInProgress bool wsnapshot *pb.Snapshot }{ // snapshot not unstable { - 0, nil, false, + entryID{}, nil, false, nil, }, // snapshot not in progress { - 5, s, false, - s, + prev4, snap4, false, + snap4, }, // snapshot in progress { - 5, s, true, + prev4, snap4, true, nil, }, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - u := newUnstable(tt.offset, raftLogger) - u.snapshot = tt.snapshot + } { + t.Run("", func(t *testing.T) { + u := newUnstableForTesting(tt.prev.append(), tt.snap) u.snapshotInProgress = tt.snapshotInProgress u.checkInvariants(t) require.Equal(t, tt.wsnapshot, u.nextSnapshot()) @@ -306,201 +276,196 @@ func TestUnstableNextSnapshot(t *testing.T) { } func TestUnstableAcceptInProgress(t *testing.T) { - tests := []struct { - entries []pb.Entry - snapshot *pb.Snapshot - offset uint64 - offsetInProgress uint64 + prev4 := entryID{term: 1, index: 4} + snap4 := &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}} + for _, tt := range []struct { + ls logSlice + snap *pb.Snapshot + entryInProgress uint64 snapshotInProgress bool - woffsetInProgress uint64 + wentryInProgress uint64 wsnapshotInProgress bool }{ { - []pb.Entry{}, nil, - 5, 5, // no entries + prev4.append(), nil, // no entries + 4, + false, // snapshot not already in progress + 4, false, + }, + { + prev4.append(1), nil, + 4, // entries not in progress false, // snapshot not already in progress 5, false, }, { - index(5).terms(1), nil, - 5, 5, // entries not in progress + prev4.append(1, 1), nil, + 4, // entries not in progress false, // snapshot not already in progress 6, false, }, { - index(5).terms(1, 1), nil, - 5, 5, // entries not in progress + prev4.append(1, 1), nil, + 5, // in-progress to the first entry false, // snapshot not already in progress - 7, false, + 6, false, }, { - index(5).terms(1, 1), nil, - 5, 6, // in-progress to the first entry + prev4.append(1, 1), nil, + 6, // in-progress to the second entry false, // snapshot not already in progress - 7, false, + 6, false, }, + // with snapshot { - index(5).terms(1, 1), nil, - 5, 7, // in-progress to the second entry + prev4.append(), snap4, // no entries + 4, false, // snapshot not already in progress - 7, false, + 4, true, }, - // with snapshot { - []pb.Entry{}, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - 5, 5, // no entries + prev4.append(1), snap4, + 4, // entries not in progress false, // snapshot not already in progress 5, true, }, { - index(5).terms(1), &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - 5, 5, // entries not in progress + prev4.append(1, 1), snap4, + 4, // entries not in progress false, // snapshot not already in progress 6, true, }, { - index(5).terms(1, 1), &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - 5, 5, // entries not in progress - false, // snapshot not already in progress - 7, true, + prev4.append(), snap4, + 4, // entries not in progress + true, // snapshot already in progress + 4, true, }, { - []pb.Entry{}, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - 5, 5, // entries not in progress + prev4.append(1), snap4, + 4, // entries not in progress true, // snapshot already in progress 5, true, }, { - index(5).terms(1), &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - 5, 5, // entries not in progress + prev4.append(1, 1), snap4, + 4, // entries not in progress true, // snapshot already in progress 6, true, }, { - index(5).terms(1, 1), &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - 5, 5, // entries not in progress - true, // snapshot already in progress - 7, true, - }, - { - index(5).terms(1, 1), &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - 5, 6, // in-progress to the first entry + prev4.append(1, 1), snap4, + 5, // in-progress to the first entry true, // snapshot already in progress - 7, true, + 6, true, }, { - index(5).terms(1, 1), &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, - 5, 7, // in-progress to the second entry + prev4.append(1, 1), snap4, + 6, // in-progress to the second entry true, // snapshot already in progress - 7, true, + 6, true, }, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - u := newUnstable(tt.offset, raftLogger) - u.snapshot = tt.snapshot - u.entries = tt.entries + } { + t.Run("", func(t *testing.T) { + u := newUnstableForTesting(tt.ls, tt.snap) u.snapshotInProgress = tt.snapshotInProgress - u.offsetInProgress = tt.offsetInProgress + u.entryInProgress = tt.entryInProgress u.checkInvariants(t) u.acceptInProgress() u.checkInvariants(t) - require.Equal(t, tt.woffsetInProgress, u.offsetInProgress) + require.Equal(t, tt.wentryInProgress, u.entryInProgress) require.Equal(t, tt.wsnapshotInProgress, u.snapshotInProgress) }) } } func TestUnstableStableTo(t *testing.T) { - tests := []struct { - entries []pb.Entry - offset uint64 - offsetInProgress uint64 - snap *pb.Snapshot - index, term uint64 - - woffset uint64 - woffsetInProgress uint64 - wlen int + prev4 := entryID{term: 1, index: 4} + snap4 := &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}} + for _, tt := range []struct { + ls logSlice + entryInProgress uint64 + snap *pb.Snapshot + index, term uint64 + + wprev uint64 + wentryInProgress uint64 + wlen int }{ { - []pb.Entry{}, 0, 0, nil, + logSlice{}, 0, nil, 5, 1, 0, 0, 0, }, { - index(5).terms(1), 5, 6, nil, + prev4.append(1), 5, nil, 5, 1, // stable to the first entry - 6, 6, 0, + 5, 5, 0, }, { - index(5).terms(1, 1), 5, 6, nil, + prev4.append(1, 1), 5, nil, 5, 1, // stable to the first entry - 6, 6, 1, + 5, 5, 1, }, { - index(5).terms(1, 1), 5, 7, nil, + prev4.append(1, 1), 6, nil, 5, 1, // stable to the first entry and in-progress ahead - 6, 7, 1, + 5, 6, 1, }, { - index(6).terms(2), 6, 7, nil, + entryID{term: 1, index: 5}.append(2), 6, nil, 6, 1, // stable to the first entry and term mismatch - 6, 7, 1, + 5, 6, 1, }, { - index(5).terms(1), 5, 6, nil, + prev4.append(1), 5, nil, 4, 1, // stable to old entry - 5, 6, 1, + 4, 5, 1, }, { - index(5).terms(1), 5, 6, nil, + prev4.append(1), 5, nil, 4, 2, // stable to old entry - 5, 6, 1, + 4, 5, 1, }, // with snapshot { - index(5).terms(1), 5, 6, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(1), 5, snap4, 5, 1, // stable to the first entry - 6, 6, 0, + 5, 5, 0, }, { - index(5).terms(1, 1), 5, 6, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(1, 1), 5, snap4, 5, 1, // stable to the first entry - 6, 6, 1, + 5, 5, 1, }, { - index(5).terms(1, 1), 5, 7, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(1, 1), 6, snap4, 5, 1, // stable to the first entry and in-progress ahead - 6, 7, 1, + 5, 6, 1, }, { - index(6).terms(2), 6, 7, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 5, Term: 1}}, + entryID{term: 1, index: 5}.append(2), 6, + &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 5, Term: 1}}, 6, 1, // stable to the first entry and term mismatch - 6, 7, 1, + 5, 6, 1, }, { - index(5).terms(1), 5, 6, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + prev4.append(1), 5, snap4, 4, 1, // stable to snapshot - 5, 6, 1, + 4, 5, 1, }, { - index(5).terms(2), 5, 6, &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 2}}, + prev4.append(2), 5, snap4, 4, 1, // stable to old entry - 5, 6, 1, + 4, 5, 1, }, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - u := newUnstable(tt.offset, raftLogger) - u.snapshot = tt.snap - u.entries = tt.entries - u.offsetInProgress = tt.offsetInProgress - u.snapshotInProgress = u.snapshot != nil && u.offsetInProgress > u.offset + } { + t.Run("", func(t *testing.T) { + u := newUnstableForTesting(tt.ls, tt.snap) + u.entryInProgress = tt.entryInProgress + u.snapshotInProgress = u.snapshot != nil && u.entryInProgress > u.prev.index u.checkInvariants(t) if u.snapshotInProgress { @@ -509,89 +474,92 @@ func TestUnstableStableTo(t *testing.T) { u.checkInvariants(t) u.stableTo(entryID{term: tt.term, index: tt.index}) u.checkInvariants(t) - require.Equal(t, tt.woffset, u.offset) - require.Equal(t, tt.woffsetInProgress, u.offsetInProgress) + require.Equal(t, tt.wprev, u.prev.index) + require.Equal(t, tt.wentryInProgress, u.entryInProgress) require.Equal(t, tt.wlen, len(u.entries)) }) } } func TestUnstableTruncateAndAppend(t *testing.T) { - tests := []struct { - entries []pb.Entry - offset uint64 - offsetInProgress uint64 - snap *pb.Snapshot - toappend []pb.Entry - - woffset uint64 - woffsetInProgress uint64 - wentries []pb.Entry + prev4 := entryID{term: 1, index: 4} + for _, tt := range []struct { + ls logSlice + entryInProgress uint64 + snap *pb.Snapshot + app logSlice + + want logSlice + wentryInProgress uint64 }{ // append to the end { - index(5).terms(1), 5, 5, nil, - index(6).terms(1, 1), - 5, 5, index(5).terms(1, 1, 1), + prev4.append(1), 4, nil, + entryID{term: 1, index: 5}.append(1, 1), + prev4.append(1, 1, 1), + 4, }, { - index(5).terms(1), 5, 6, nil, - index(6).terms(1, 1), - 5, 6, index(5).terms(1, 1, 1), + prev4.append(1), 5, nil, + entryID{term: 1, index: 5}.append(1, 1), + prev4.append(1, 1, 1), + 5, }, // replace the unstable entries { - index(5).terms(1), 5, 5, nil, - index(5).terms(2, 2), - 5, 5, index(5).terms(2, 2), + prev4.append(1), 4, nil, + prev4.append(2, 2), + prev4.append(2, 2), + 4, }, { - index(5).terms(1), 5, 5, nil, - index(4).terms(2, 2, 2), - 4, 4, index(4).terms(2, 2, 2), + prev4.append(1), 4, nil, + entryID{term: 1, index: 3}.append(2, 2, 2), + entryID{term: 1, index: 3}.append(2, 2, 2), + 3, }, { - index(5).terms(1), 5, 6, nil, - index(5).terms(2, 2), - 5, 5, index(5).terms(2, 2), + prev4.append(1), 5, nil, + prev4.append(2, 2), + prev4.append(2, 2), + 4, }, // truncate the existing entries and append { - index(5).terms(1, 1, 1), 5, 5, nil, - index(6).terms(2), - 5, 5, index(5).terms(1, 2), + prev4.append(1, 1, 1), 4, nil, + entryID{term: 1, index: 5}.append(2), + prev4.append(1, 2), + 4, }, { - index(5).terms(1, 1, 1), 5, 5, nil, - index(7).terms(2, 2), - 5, 5, index(5).terms(1, 1, 2, 2), + prev4.append(1, 1, 1), 4, nil, + entryID{term: 1, index: 6}.append(2, 2), + prev4.append(1, 1, 2, 2), + 4, }, { - index(5).terms(1, 1, 1), 5, 6, nil, - index(6).terms(2), - 5, 6, index(5).terms(1, 2), + prev4.append(1, 1, 1), 5, nil, + entryID{term: 1, index: 5}.append(2), + prev4.append(1, 2), + 5, }, { - index(5).terms(1, 1, 1), 5, 7, nil, - index(6).terms(2), - 5, 6, index(5).terms(1, 2), + prev4.append(1, 1, 1), 6, nil, + entryID{term: 1, index: 5}.append(2), + prev4.append(1, 2), + 5, }, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - u := newUnstable(tt.offset, raftLogger) - u.snapshot = tt.snap - u.entries = tt.entries - u.offsetInProgress = tt.offsetInProgress - u.snapshotInProgress = u.snapshot != nil && u.offsetInProgress > u.offset + } { + t.Run("", func(t *testing.T) { + u := newUnstableForTesting(tt.ls, tt.snap) + u.entryInProgress = tt.entryInProgress + u.snapshotInProgress = u.snapshot != nil && u.entryInProgress > u.prev.index u.checkInvariants(t) - u.truncateAndAppend(tt.toappend) + u.truncateAndAppend(tt.app) u.checkInvariants(t) - require.Equal(t, tt.woffset, u.offset) - require.Equal(t, tt.woffsetInProgress, u.offsetInProgress) - require.Equal(t, tt.wentries, u.entries) + require.Equal(t, tt.want, u.logSlice) + require.Equal(t, tt.wentryInProgress, u.entryInProgress) }) } } diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index a8dcb88068e4..d63c770794e2 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -351,22 +351,6 @@ type raft struct { // the leader id lead pb.PeerID - // accTerm is the term of the leader whose append was accepted into the log - // last. Note that a rejected append does not update accTerm, by definition. - // - // Invariant: the log is a prefix of the accTerm's leader log - // Invariant: raftLog.lastEntryID().term <= accTerm <= Term - // - // In steady state, accTerm == Term. When someone campaigns, Term briefly - // overtakes the accTerm. However, accTerm catches up as soon as we accept a - // log append from the new leader. - // - // NB: the log can be partially or fully compacted. When we say "log" above, - // we logically include all the entries that were the pre-image of a snapshot, - // as well as the entries that are still physically in the log. - // - // TODO(pav-kv): move accTerm to raftLog. - accTerm uint64 // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee pb.PeerID @@ -444,22 +428,6 @@ func newRaft(c *Config) *raft { } lastID := r.raftLog.lastEntryID() - // To initialize accTerm correctly, we make sure its invariant is true: the - // log is a prefix of the accTerm leader's log. This can be achieved by - // conservatively initializing accTerm to the term of the last log entry. - // - // We can't pick any lower term because the lower term's leader can't have - // lastID.term entries in it. We can't pick a higher term because we don't - // have any information about the higher-term leaders and their logs. So the - // lastID.term is the only valid choice. - // - // TODO(pav-kv): persist accTerm in HardState and load it. Our initialization - // is conservative. Before restart, accTerm could have been higher. Setting a - // higher accTerm (ideally, matching the current leader Term) gives us more - // information about the log, and then allows bumping its commit index sooner - // than when the next MsgApp arrives. - r.accTerm = lastID.term - cfg, trk, err := confchange.Restore(confchange.Changer{ Tracker: r.trk, LastIndex: lastID.index, @@ -943,8 +911,6 @@ func (r *raft) becomeLeader() { // This won't happen because we just called reset() above. r.logger.Panic("empty entry was dropped") } - // The leader's log is consistent with itself. - r.accTerm = r.Term // The payloadSize of an empty entry is 0 (see TestPayloadSizeOfEmptyEntry), // so the preceding log append does not count against the uncommitted log // quota of the new leader. In other words, after the call to appendEntry, @@ -1717,9 +1683,8 @@ func (r *raft) handleAppendEntries(m pb.Message) { return } if r.raftLog.maybeAppend(a) { - r.accTerm = m.Term // our log is now consistent with the m.Term leader // TODO(pav-kv): make it possible to commit even if the append did not - // succeed or is stale. If r.accTerm >= m.Term, then our log contains all + // succeed or is stale. If accTerm >= m.Term, then our log contains all // committed entries at m.Term (by raft invariants), so it is safe to bump // the commit index even if the MsgApp is stale. lastIndex := a.lastIndex() @@ -1798,7 +1763,7 @@ func (r *raft) handleHeartbeat(m pb.Message) { // TODO(pav-kv): move this logic to raftLog.commitTo, once the accTerm has // migrated to raftLog/unstable. mark := logMark{term: m.Term, index: min(m.Commit, r.raftLog.lastIndex())} - if mark.term == r.accTerm { + if mark.term == r.raftLog.accTerm() { r.raftLog.commitTo(mark) } r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp}) @@ -1822,7 +1787,6 @@ func (r *raft) handleSnapshot(m pb.Message) { if r.restore(s) { r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, id.index, id.term) - r.accTerm = m.Term // our log is now consistent with the leader r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) } else { r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", @@ -1892,7 +1856,10 @@ func (r *raft) restore(s snapshot) bool { return false } - r.raftLog.restore(s) + if !r.raftLog.restore(s) { + r.logger.Errorf("%x unable to restore snapshot [index: %d, term: %d]", r.id, id.index, id.term) + return false + } // Reset the configuration and add the (potentially updated) peers in anew. r.trk = tracker.MakeProgressTracker(r.trk.MaxInflight, r.trk.MaxInflightBytes) diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index fb5f7fe5487c..927ab2cf3eaa 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -1104,7 +1104,7 @@ func TestHandleMsgApp(t *testing.T) { // Ensure 2 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false}, - {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Index: 1, Term: 2}}}, 1, 1, false}, + {pb.Message{Type: pb.MsgApp, Term: 3, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Index: 1, Term: 3}}}, 1, 1, false}, {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Index: 3, Term: 2}, {Index: 4, Term: 2}}}, 4, 3, false}, {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Index: 3, Term: 2}}}, 3, 3, false}, {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false},