Skip to content

Commit

Permalink
raftstore: gc log cache eagerly (tikv#2962)
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay authored and hicqu committed Apr 25, 2018
1 parent 9b1718e commit f5f655c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 18 deletions.
40 changes: 31 additions & 9 deletions src/raftstore/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ struct EntryCache {
}

impl EntryCache {
fn first_index(&self) -> u64 {
self.cache.front().map_or(u64::MAX, |e| e.get_index())
fn first_index(&self) -> Option<u64> {
self.cache.front().map(|e| e.get_index())
}

fn fetch_entries_to(
Expand Down Expand Up @@ -212,17 +212,15 @@ impl EntryCache {
}

pub fn compact_to(&mut self, idx: u64) {
let cache_first_idx = match self.cache.front() {
None => return,
Some(e) => e.get_index(),
};

let cache_first_idx = self.first_index().unwrap_or(u64::MAX);
if cache_first_idx > idx {
return;
}
let cache_last_idx = self.cache.back().unwrap().get_index();
// Use `cache_last_idx + 1` to make sure cache can be cleared completely
// if neccessary.
self.cache
.drain(..(cmp::min(cache_last_idx, idx) - cache_first_idx) as usize);
.drain(..(cmp::min(cache_last_idx + 1, idx) - cache_first_idx) as usize);
if self.cache.len() < SHRINK_CACHE_CAPACITY && self.cache.capacity() > SHRINK_CACHE_CAPACITY
{
// So the peer storage doesn't have much writes since the proposal of compaction,
Expand Down Expand Up @@ -545,7 +543,7 @@ impl PeerStorage {
if low == high {
return Ok(ents);
}
let cache_low = self.cache.first_index();
let cache_low = self.cache.first_index().unwrap_or(u64::MAX);
let region_id = self.get_region_id();
if high <= cache_low {
// not overlap
Expand Down Expand Up @@ -791,6 +789,24 @@ impl PeerStorage {
self.cache.compact_to(idx);
}

pub fn maybe_gc_cache(&mut self, replicated_idx: u64, apply_idx: u64) {
if replicated_idx == apply_idx {
// The region is inactive, clear the cache immediately.
self.cache.compact_to(apply_idx + 1);
} else {
let cache_first_idx = match self.cache.first_index() {
None => return,
Some(idx) => idx,
};
if cache_first_idx > replicated_idx + 1 {
// Catching up log requires accessing fs already, let's optimize for
// the common case.
// Maybe gc to second least replicated_idx is better.
self.cache.compact_to(apply_idx + 1);
}
}
}

// Apply the peer with given snapshot.
pub fn apply_snapshot(
&mut self,
Expand Down Expand Up @@ -1948,6 +1964,12 @@ mod test {
exp_res = (1..7).map(|i| new_entry(i, 8)).collect();
validate_cache(&store, &exp_res);
assert!(store.cache.cache.capacity() < cap as usize);

// compact all
store.compact_to(cap + 2);
validate_cache(&store, &[]);
// invalid compaction should be ignored.
store.compact_to(cap);
}

#[test]
Expand Down
25 changes: 16 additions & 9 deletions src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2166,7 +2166,9 @@ impl<T: Transport, C: PdClient> Store<T, C> {
let mut total_gc_logs = 0;

for (&region_id, peer) in &mut self.region_peers {
let applied_idx = peer.get_store().applied_index();
if !peer.is_leader() {
peer.mut_store().compact_to(applied_idx + 1);
continue;
}

Expand All @@ -2182,16 +2184,20 @@ impl<T: Transport, C: PdClient> Store<T, C> {
// ^ ^
// |-----------------threshold------------ |
// first_index replicated_index
let replicated_idx = peer.raft_group
.raft
.prs()
.iter()
.map(|(_, p)| p.matched)
.min()
.unwrap();
// `healthy_replicated_index` is the smallest `replicated_index` of healthy nodes.
let truncated_idx = peer.get_store().truncated_index();
let last_idx = peer.get_store().last_index();
let (mut replicated_idx, mut healthy_replicated_idx) = (last_idx, last_idx);
for (_, p) in peer.raft_group.raft.prs().iter() {
if replicated_idx > p.matched {
replicated_idx = p.matched;
}
if healthy_replicated_idx > p.matched && p.matched >= truncated_idx {
healthy_replicated_idx = p.matched;
}
}
// When an election happened or a new peer is added, replicated_idx can be 0.
if replicated_idx > 0 {
let last_idx = peer.raft_group.raft.raft_log.last_index();
assert!(
last_idx >= replicated_idx,
"expect last index {} >= replicated index {}",
Expand All @@ -2200,7 +2206,8 @@ impl<T: Transport, C: PdClient> Store<T, C> {
);
REGION_MAX_LOG_LAG.observe((last_idx - replicated_idx) as f64);
}
let applied_idx = peer.get_store().applied_index();
peer.mut_store()
.maybe_gc_cache(healthy_replicated_idx, applied_idx);
let first_idx = peer.get_store().first_index();
let mut compact_idx;
if applied_idx > first_idx
Expand Down

0 comments on commit f5f655c

Please sign in to comment.