Skip to content

Commit

Permalink
Merge pull request #18327 from nvanbenschoten/nvanbenschoten/lastTerm
Browse files Browse the repository at this point in the history
storage: invalidate cached lastTerm on snapshots
  • Loading branch information
nvanbenschoten authored Sep 7, 2017
2 parents 3137b38 + e39e856 commit 839cf5e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
17 changes: 12 additions & 5 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ type Replica struct {
// Last index/term persisted to the raft log (not necessarily
// committed). Note that lastTerm may be 0 (and thus invalid) even when
// lastIndex is known, in which case the term will have to be retrieved
// from the Raft log entry.
// from the Raft log entry. Use the invalidLastTerm constant for this
// case.
lastIndex, lastTerm uint64
// The most recent commit index seen in a message from the leader. Used by
// the follower to estimate the number of Raft log entries it is
Expand Down Expand Up @@ -672,7 +673,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(
if err != nil {
return err
}
r.mu.lastTerm = 0
r.mu.lastTerm = invalidLastTerm

pErr, err := r.mu.stateLoader.loadReplicaDestroyedError(ctx, r.store.Engine())
if err != nil {
Expand Down Expand Up @@ -3249,9 +3250,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, err
}

// r.mu.lastIndex and r.mu.lastTerm were updated in applySnapshot, but
// we also want to make sure we reflect these changes in the local
// variables we're tracking here. We could pull these values from
// r.mu itself, but that would require us to grab a lock.
if lastIndex, err = r.raftMu.stateLoader.loadLastIndex(ctx, r.store.Engine()); err != nil {
return stats, err
}
lastTerm = invalidLastTerm

// We refresh pending commands after applying a snapshot because this
// replica may have been temporarily partitioned from the Raft group and
// missed leadership changes that occurred. Suppose node A is the leader,
Expand Down Expand Up @@ -3330,9 +3337,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
}

// Update protected state (last index, raft log size and raft leader ID) and
// set raft log entry cache. We clear any older, uncommitted log entries and
// cache the latest ones.
// Update protected state (last index, last term, raft log size and raft
// leader ID) and set raft log entry cache. We clear any older, uncommitted
// log entries and cache the latest ones.
//
// Note also that we're likely to send messages related to the Entries we
// just appended, and these entries need to be inlined when sending them to
Expand Down
25 changes: 17 additions & 8 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (r *Replica) raftEntriesLocked(lo, hi, maxBytes uint64) ([]raftpb.Entry, er

// entries retrieves entries from the engine. To accommodate loading the term,
// `sideloaded` can be supplied as nil, in which case sideloaded entries will
// not be inlined, the raft entry cache not populated with *any* of the
// loaded entries, and maxBytes not applied to the payloads.
// not be inlined, the raft entry cache will not be populated with *any* of the
// loaded entries, and maxBytes will not be applied to the payloads.
func entries(
ctx context.Context,
e engine.Reader,
Expand Down Expand Up @@ -232,9 +232,16 @@ func iterateEntries(
return err
}

// invalidLastTerm is an out-of-band value for r.mu.lastTerm that
// invalidates lastTerm caching and forces retrieval of Term(lastTerm)
// from the raftEntryCache/RocksDB.
const invalidLastTerm = 0

// Term implements the raft.Storage interface.
func (r *replicaRaftStorage) Term(i uint64) (uint64, error) {
if r.mu.lastIndex == i && r.mu.lastTerm != 0 {
// TODO(nvanbenschoten): should we set r.mu.lastTerm when
// r.mu.lastIndex == i && r.mu.lastTerm == invalidLastTerm?
if r.mu.lastIndex == i && r.mu.lastTerm != invalidLastTerm {
return r.mu.lastTerm, nil
}
// Try to retrieve the term for the desired entry from the entry cache.
Expand Down Expand Up @@ -526,10 +533,10 @@ func snapshot(
}

// append the given entries to the raft log. Takes the previous values of
// r.mu.lastIndex and r.mu.raftLogSize, and returns new values. We do this
// rather than modifying them directly because these modifications need to be
// atomic with the commit of the batch. This method requires that r.raftMu is
// held.
// r.mu.lastIndex, r.mu.lastTerm, and r.mu.raftLogSize, and returns new values.
// We do this rather than modifying them directly because these modifications
// need to be atomic with the commit of the batch. This method requires that
// r.raftMu is held.
//
// append is intentionally oblivious to the existence of sideloaded proposals.
// They are managed by the caller, including cleaning up obsolete on-disk
Expand Down Expand Up @@ -822,7 +829,9 @@ func (r *Replica) applySnapshot(
// feelings about this ever change, we can add a LastIndex field to
// raftpb.SnapshotMetadata.
r.mu.lastIndex = s.RaftAppliedIndex
r.mu.lastTerm = 0
// We could recompute and return the lastTerm in the snapshot, but instead we
// just set an invalid term and force a recomputation later.
r.mu.lastTerm = invalidLastTerm
r.mu.raftLogSize = raftLogSize
// Update the range and store stats.
r.store.metrics.subtractMVCCStats(r.mu.state.Stats)
Expand Down

0 comments on commit 839cf5e

Please sign in to comment.