Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
36360: raftentry: truncate entries from old terms when adding to cache r=nvanbenschoten a=nvanbenschoten

Fixes cockroachdb#36353.

See cockroachdb#36353 (comment) for the reasoning behind how this could lead to the inconsistency observed in that issue.

This change adds an option to `raftentry.Cache`'s `Add` method to truncate entries from old terms when inserting new entries into the cache. This ensures that old entries at lower terms are purged from the cache and will never be provided to `etcd/raft`. The approach here is analogous to the truncation in [`Replica.append`](https://github.com/cockroachdb/cockroach/blob/a6b3c540b696002b2ed07036a657612995d6d1ab/pkg/storage/replica_raftstorage.go#L621), but for the Raft entry cache instead of for persistent Raft entries.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Apr 1, 2019
2 parents ac1ec6a + 162490c commit 422635c
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 69 deletions.
72 changes: 50 additions & 22 deletions pkg/storage/raftentry/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ var partitionSize = int32(unsafe.Sizeof(partition{}))
// implement the below interface.
type rangeCache interface {
add(ent []raftpb.Entry) (bytesAdded, entriesAdded int32)
clear(hi uint64) (bytesRemoved, entriesRemoved int32)
truncateFrom(lo uint64) (bytesRemoved, entriesRemoved int32)
clearTo(hi uint64) (bytesRemoved, entriesRemoved int32)
get(index uint64) (raftpb.Entry, bool)
scan(ents []raftpb.Entry, lo, hi, maxBytes uint64) (
_ []raftpb.Entry, bytes uint64, nextIdx uint64, exceededMaxBytes bool)
Expand All @@ -130,34 +131,61 @@ func (c *Cache) Metrics() Metrics {
return c.metrics
}

// Add inserts ents into the cache.
func (c *Cache) Add(id roachpb.RangeID, ents []raftpb.Entry) {
bytesGuessed := analyzeEntries(ents)
if bytesGuessed == 0 || bytesGuessed > c.maxBytes {
// Add inserts ents into the cache. If truncate is true, the method also removes
// all entries with indices equal to or greater than the indices of the entries
// provided. ents is expected to consist of entries with a contiguous sequence
// of indices.
func (c *Cache) Add(id roachpb.RangeID, ents []raftpb.Entry, truncate bool) {
if len(ents) == 0 {
return
}
bytesGuessed := analyzeEntries(ents)
add := bytesGuessed <= c.maxBytes
if !add {
bytesGuessed = 0
}

c.mu.Lock()
// Get p and move the partition to the front of the LRU.
p := c.getPartLocked(id, true /* create */, true /* recordUse */)
c.evictLocked(bytesGuessed)
if len(c.parts) == 0 { // Get p again if we evicted everything.
p = c.getPartLocked(id, true /* create */, false /* recordUse */)
}
// Use the atomic (load|set)Size partition methods to avoid a race condition
// on p.size and to ensure that p.size.bytes() reflects the number of bytes
// in c.bytes associated with p in the face of concurrent updates due to calls
// to c.recordUpdate.
for {
prev := p.loadSize()
if p.setSize(prev, prev.add(bytesGuessed, 0)) {
break
p := c.getPartLocked(id, add /* create */, true /* recordUse */)
if bytesGuessed > 0 {
c.evictLocked(bytesGuessed)
if len(c.parts) == 0 { // Get p again if we evicted everything.
p = c.getPartLocked(id, true /* create */, false /* recordUse */)
}
// Use the atomic (load|set)Size partition methods to avoid a race condition
// on p.size and to ensure that p.size.bytes() reflects the number of bytes
// in c.bytes associated with p in the face of concurrent updates due to calls
// to c.recordUpdate.
for {
prev := p.loadSize()
if p.setSize(prev, prev.add(bytesGuessed, 0)) {
break
}
}
}
c.mu.Unlock()
if p == nil {
// The partition did not exist and we did not create it.
// Only possible if !add.
return
}

p.mu.Lock()
defer p.mu.Unlock()
bytesAdded, entriesAdded := p.add(ents)
c.recordUpdate(p, bytesAdded, bytesGuessed, entriesAdded)
var bytesAdded, entriesAdded, bytesRemoved, entriesRemoved int32
if add {
bytesAdded, entriesAdded = p.add(ents)
}
if truncate {
truncIdx := ents[0].Index
if add {
// Some entries were already overwritten.
truncIdx = ents[len(ents)-1].Index + 1
}
bytesRemoved, entriesRemoved = p.truncateFrom(truncIdx)
}
c.recordUpdate(p, bytesAdded-bytesRemoved, bytesGuessed, entriesAdded-entriesRemoved)
}

// Clear removes all entries on the given range with index less than hi.
Expand All @@ -171,7 +199,7 @@ func (c *Cache) Clear(id roachpb.RangeID, hi uint64) {
c.mu.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
bytesRemoved, entriesRemoved := p.clear(hi)
bytesRemoved, entriesRemoved := p.clearTo(hi)
c.recordUpdate(p, -1*bytesRemoved, 0, -1*entriesRemoved)
}

Expand All @@ -195,7 +223,7 @@ func (c *Cache) Get(id roachpb.RangeID, idx uint64) (e raftpb.Entry, ok bool) {
}

// Scan returns entries between [lo, hi) for specified range. If any entries are
// returned for the specified indexes, they will start with index lo and proceed
// returned for the specified indices, they will start with index lo and proceed
// sequentially without gaps until 1) all entries exclusive of hi are fetched,
// 2) fetching another entry would add up to more than maxBytes of data, or 3) a
// cache miss occurs. The returned size reflects the size of the returned
Expand Down
Loading

0 comments on commit 422635c

Please sign in to comment.