From e39e8560a567125c5d664cf03d2c663ad1b91d81 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 7 Sep 2017 14:56:25 -0400 Subject: [PATCH] storage: invalidate cached lastTerm on snapshots Addresses the current issue in #17524. I'll open an issue to properly test this, but that might take some time. Fow now this seems like a clear fix that we should get in sooner rather than later. --- pkg/storage/replica.go | 17 ++++++++++++----- pkg/storage/replica_raftstorage.go | 25 +++++++++++++++++-------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index fdc5df5534c3..62694a0c8dce 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -300,7 +300,8 @@ type Replica struct { // Last index/term persisted to the raft log (not necessarily // committed). Note that lastTerm may be 0 (and thus invalid) even when // lastIndex is known, in which case the term will have to be retrieved - // from the Raft log entry. + // from the Raft log entry. Use the invalidLastTerm constant for this + // case. lastIndex, lastTerm uint64 // The most recent commit index seen in a message from the leader. Used by // the follower to estimate the number of Raft log entries it is @@ -672,7 +673,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( if err != nil { return err } - r.mu.lastTerm = 0 + r.mu.lastTerm = invalidLastTerm pErr, err := r.mu.stateLoader.loadReplicaDestroyedError(ctx, r.store.Engine()) if err != nil { @@ -3249,9 +3250,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, err } + // r.mu.lastIndex and r.mu.lastTerm were updated in applySnapshot, but + // we also want to make sure we reflect these changes in the local + // variables we're tracking here. We could pull these values from + // r.mu itself, but that would require us to grab a lock. if lastIndex, err = r.raftMu.stateLoader.loadLastIndex(ctx, r.store.Engine()); err != nil { return stats, err } + lastTerm = invalidLastTerm + // We refresh pending commands after applying a snapshot because this // replica may have been temporarily partitioned from the Raft group and // missed leadership changes that occurred. Suppose node A is the leader, @@ -3330,9 +3337,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } - // Update protected state (last index, raft log size and raft leader ID) and - // set raft log entry cache. We clear any older, uncommitted log entries and - // cache the latest ones. + // Update protected state (last index, last term, raft log size and raft + // leader ID) and set raft log entry cache. We clear any older, uncommitted + // log entries and cache the latest ones. // // Note also that we're likely to send messages related to the Entries we // just appended, and these entries need to be inlined when sending them to diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 0ff1fec42df3..9ec4f160dffb 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -90,8 +90,8 @@ func (r *Replica) raftEntriesLocked(lo, hi, maxBytes uint64) ([]raftpb.Entry, er // entries retrieves entries from the engine. To accommodate loading the term, // `sideloaded` can be supplied as nil, in which case sideloaded entries will -// not be inlined, the raft entry cache not populated with *any* of the -// loaded entries, and maxBytes not applied to the payloads. +// not be inlined, the raft entry cache will not be populated with *any* of the +// loaded entries, and maxBytes will not be applied to the payloads. func entries( ctx context.Context, e engine.Reader, @@ -232,9 +232,16 @@ func iterateEntries( return err } +// invalidLastTerm is an out-of-band value for r.mu.lastTerm that +// invalidates lastTerm caching and forces retrieval of Term(lastTerm) +// from the raftEntryCache/RocksDB. +const invalidLastTerm = 0 + // Term implements the raft.Storage interface. func (r *replicaRaftStorage) Term(i uint64) (uint64, error) { - if r.mu.lastIndex == i && r.mu.lastTerm != 0 { + // TODO(nvanbenschoten): should we set r.mu.lastTerm when + // r.mu.lastIndex == i && r.mu.lastTerm == invalidLastTerm? + if r.mu.lastIndex == i && r.mu.lastTerm != invalidLastTerm { return r.mu.lastTerm, nil } // Try to retrieve the term for the desired entry from the entry cache. @@ -526,10 +533,10 @@ func snapshot( } // append the given entries to the raft log. Takes the previous values of -// r.mu.lastIndex and r.mu.raftLogSize, and returns new values. We do this -// rather than modifying them directly because these modifications need to be -// atomic with the commit of the batch. This method requires that r.raftMu is -// held. +// r.mu.lastIndex, r.mu.lastTerm, and r.mu.raftLogSize, and returns new values. +// We do this rather than modifying them directly because these modifications +// need to be atomic with the commit of the batch. This method requires that +// r.raftMu is held. // // append is intentionally oblivious to the existence of sideloaded proposals. // They are managed by the caller, including cleaning up obsolete on-disk @@ -822,7 +829,9 @@ func (r *Replica) applySnapshot( // feelings about this ever change, we can add a LastIndex field to // raftpb.SnapshotMetadata. r.mu.lastIndex = s.RaftAppliedIndex - r.mu.lastTerm = 0 + // We could recompute and return the lastTerm in the snapshot, but instead we + // just set an invalid term and force a recomputation later. + r.mu.lastTerm = invalidLastTerm r.mu.raftLogSize = raftLogSize // Update the range and store stats. r.store.metrics.subtractMVCCStats(r.mu.state.Stats)