Skip to content

Commit

Permalink
raftstore: schedule log gc by size (#1362)
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay authored Nov 28, 2016
1 parent c8774ee commit 530d769
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 30 deletions.
10 changes: 10 additions & 0 deletions etc/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ region-split-size = "64MB"
# whether the region should be split or not.
region-split-check-diff = "8MB"

# Interval to gc unnecessary raft log.
# raft-log-gc-tick-interval = "10s"
# A threshold to gc stale raft log, must >= 1.
# raft-log-gc-threshold = 50
# When entry count exceed this value, gc will be forced trigger.
# raft-log-gc-count-limit = 48000
# When the approximate size of raft log entries exceed this value, gc will be forced trigger.
# It's recommanded to set it to 3/4 of region-split-size.
# raft-log-gc-size-limit = "48MB"

# When a peer hasn't been active for max-peer-down-duration,
# we will consider this peer to be down and report it to pd.
max-peer-down-duration = "5m"
Expand Down
17 changes: 17 additions & 0 deletions src/bin/tikv-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,23 @@ fn build_cfg(matches: &Matches, config: &toml::Value, cluster_id: u64, addr: &st
"raftstore.region-split-check-diff",
Some(8 * 1024 * 1024)) as u64;

cfg.raft_store.raft_log_gc_tick_interval =
get_toml_int(config, "raftstore.raft-log-gc-tick-interval", Some(10_000)) as u64;

cfg.raft_store.raft_log_gc_threshold =
get_toml_int(config, "raftstore.raft-log-gc-threshold", Some(50)) as u64;

let default_size_limit = cfg.raft_store.raft_log_gc_count_limit as i64;
cfg.raft_store.raft_log_gc_count_limit =
get_toml_int(config,
"raftstore.raft-log-gc-count-limit",
Some(default_size_limit)) as u64;

cfg.raft_store.raft_log_gc_size_limit =
get_toml_int(config,
"raftstore.raft-log-gc-size-limit",
Some(48 * 1024 * 1024)) as u64;

cfg.raft_store.region_compact_check_interval_secs =
get_toml_int(config,
"raftstore.region-compact-check-interval-secs",
Expand Down
18 changes: 14 additions & 4 deletions src/raftstore/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ const RAFT_HEARTBEAT_TICKS: usize = 10;
const RAFT_ELECTION_TIMEOUT_TICKS: usize = 50;
const RAFT_MAX_SIZE_PER_MSG: u64 = 1024 * 1024;
const RAFT_MAX_INFLIGHT_MSGS: usize = 256;
const RAFT_LOG_GC_INTERVAL: u64 = 5000;
const RAFT_LOG_GC_INTERVAL: u64 = 10000;
const RAFT_LOG_GC_THRESHOLD: u64 = 50;
const RAFT_LOG_GC_LIMIT: u64 = 100000;
// Assume the average size of entries is 1k.
const RAFT_LOG_GC_COUNT_LIMIT: u64 = REGION_SPLIT_SIZE * 3 / 4 / 1024;
const RAFT_LOG_GC_SIZE_LIMIT: u64 = REGION_SPLIT_SIZE * 3 / 4;
const SPLIT_REGION_CHECK_TICK_INTERVAL: u64 = 10000;
const REGION_SPLIT_SIZE: u64 = 64 * 1024 * 1024;
const REGION_MAX_SIZE: u64 = 80 * 1024 * 1024;
Expand Down Expand Up @@ -63,7 +65,10 @@ pub struct Config {
// A threshold to gc stale raft log, must >= 1.
pub raft_log_gc_threshold: u64,
// When entry count exceed this value, gc will be forced trigger.
pub raft_log_gc_limit: u64,
pub raft_log_gc_count_limit: u64,
// When the approximate size of raft log entries exceed this value,
// gc will be forced trigger.
pub raft_log_gc_size_limit: u64,

// Interval (ms) to check region whether need to be split or not.
pub split_region_check_tick_interval: u64,
Expand Down Expand Up @@ -112,7 +117,8 @@ impl Default for Config {
raft_max_inflight_msgs: RAFT_MAX_INFLIGHT_MSGS,
raft_log_gc_tick_interval: RAFT_LOG_GC_INTERVAL,
raft_log_gc_threshold: RAFT_LOG_GC_THRESHOLD,
raft_log_gc_limit: RAFT_LOG_GC_LIMIT,
raft_log_gc_count_limit: RAFT_LOG_GC_COUNT_LIMIT,
raft_log_gc_size_limit: RAFT_LOG_GC_SIZE_LIMIT,
split_region_check_tick_interval: SPLIT_REGION_CHECK_TICK_INTERVAL,
region_max_size: REGION_MAX_SIZE,
region_split_size: REGION_SPLIT_SIZE,
Expand Down Expand Up @@ -144,6 +150,10 @@ impl Config {
self.raft_log_gc_threshold));
}

if self.raft_log_gc_size_limit == 0 {
return Err(box_err!("raft log gc size limit should large than 0."));
}

if self.region_max_size < self.region_split_size {
return Err(box_err!("region max size {} must >= split size {}",
self.region_max_size,
Expand Down
39 changes: 30 additions & 9 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ pub enum ExecResult {
peer: metapb::Peer,
region: metapb::Region,
},
CompactLog { state: RaftTruncatedState },
CompactLog {
state: RaftTruncatedState,
first_index: u64,
},
SplitRegion {
left: metapb::Region,
right: metapb::Region,
Expand Down Expand Up @@ -172,9 +175,6 @@ fn notify_region_removed(region_id: u64, peer_id: u64, mut cmd: PendingCmd) {
pub struct Peer {
engine: Arc<DB>,
peer_cache: Rc<RefCell<HashMap<u64, metapb::Peer>>>,
// if we remove ourself in ChangePeer remove, we should set this flag, then
// any following committed logs in same Ready should be applied failed.
pending_remove: bool,
pub peer: metapb::Peer,
region_id: u64,
pub raft_group: RawNode<PeerStorage>,
Expand All @@ -187,12 +187,16 @@ pub struct Peer {
/// delete keys' count since last reset.
pub delete_keys_hint: u64,

leader_missing_time: Option<Instant>,

pub tag: String,

pub last_compacted_idx: u64,
// Approximate size of logs that is applied but not compacted yet.
pub raft_log_size_hint: u64,

// if we remove ourself in ChangePeer remove, we should set this flag, then
// any following committed logs in same Ready should be applied failed.
pending_remove: bool,
leader_missing_time: Option<Instant>,
leader_lease_expired_time: Option<Timespec>,
election_timeout: TimeDuration,
}
Expand Down Expand Up @@ -281,6 +285,7 @@ impl Peer {
leader_missing_time: Some(Instant::now()),
tag: tag,
last_compacted_idx: 0,
raft_log_size_hint: 0,
leader_lease_expired_time: None,
election_timeout: TimeDuration::milliseconds(cfg.raft_base_tick_interval as i64) *
cfg.raft_election_timeout_ticks as i32,
Expand Down Expand Up @@ -598,6 +603,11 @@ impl Peer {
apply_result.is_some(),
ready.hs.is_some());

if apply_result.is_some() {
// When apply snapshot, there is no log applied but not compacted yet.
self.raft_log_size_hint = 0;
}

Ok(Some(ReadyResult {
ready: Some(ready),
apply_snap_result: apply_result,
Expand Down Expand Up @@ -1005,6 +1015,9 @@ impl Peer {
entry.get_index());
}

// raft meta is very small, can be ignored.
self.raft_log_size_hint += entry.get_data().len() as u64;

let res = try!(match entry.get_entry_type() {
eraftpb::EntryType::EntryNormal => self.handle_raft_entry_normal(entry),
eraftpb::EntryType::EntryConfChange => self.handle_raft_entry_conf_change(entry),
Expand Down Expand Up @@ -1193,7 +1206,7 @@ impl Peer {
.write(ctx.wb)
.unwrap_or_else(|e| panic!("{} failed to commit apply result: {:?}", self.tag, e));

let mut storage = self.mut_store();
let mut storage = self.raft_group.mut_store();
storage.apply_state = ctx.apply_state;
storage.applied_index_term = term;

Expand All @@ -1202,7 +1215,12 @@ impl Peer {
ExecResult::ChangePeer { ref region, .. } => {
storage.region = region.clone();
}
ExecResult::CompactLog { .. } => {}
ExecResult::CompactLog { first_index, ref state } => {
let total_cnt = index - first_index;
// the size of current CompactLog command can be ignored.
let remain_cnt = index - state.get_index() - 1;
self.raft_log_size_hint = self.raft_log_size_hint * remain_cnt / total_cnt;
}
ExecResult::SplitRegion { ref left, .. } => {
storage.region = left.clone();
}
Expand Down Expand Up @@ -1533,7 +1551,10 @@ impl Peer {
PEER_ADMIN_CMD_COUNTER_VEC.with_label_values(&["compact", "success"]).inc();

Ok((resp,
Some(ExecResult::CompactLog { state: ctx.apply_state.get_truncated_state().clone() })))
Some(ExecResult::CompactLog {
state: ctx.apply_state.get_truncated_state().clone(),
first_index: first_index,
})))
}

fn exec_write_cmd(&mut self, ctx: &ExecContext) -> Result<RaftCmdResponse> {
Expand Down
8 changes: 6 additions & 2 deletions src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
ExecResult::ChangePeer { change_type, peer, .. } => {
self.on_ready_change_peer(region_id, change_type, peer)
}
ExecResult::CompactLog { state } => self.on_ready_compact_log(region_id, state),
ExecResult::CompactLog { state, .. } => self.on_ready_compact_log(region_id, state),
ExecResult::SplitRegion { left, right } => {
self.on_ready_split_region(region_id, left, right)
}
Expand Down Expand Up @@ -1131,6 +1131,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
};
}

#[allow(if_same_then_else)]
fn on_raft_gc_log_tick(&mut self, event_loop: &mut EventLoop<Self>) {
for (&region_id, peer) in &mut self.region_peers {
if !peer.is_leader() {
Expand Down Expand Up @@ -1159,7 +1160,10 @@ impl<T: Transport, C: PdClient> Store<T, C> {
let applied_idx = peer.get_store().applied_index();
let first_idx = peer.get_store().first_index();
let compact_idx;
if applied_idx > first_idx && applied_idx - first_idx >= self.cfg.raft_log_gc_limit {
if applied_idx > first_idx &&
applied_idx - first_idx >= self.cfg.raft_log_gc_count_limit {
compact_idx = applied_idx;
} else if peer.raft_log_size_hint >= self.cfg.raft_log_gc_size_limit {
compact_idx = applied_idx;
} else if replicated_idx < first_idx ||
replicated_idx - first_idx <= self.cfg.raft_log_gc_threshold {
Expand Down
112 changes: 104 additions & 8 deletions tests/raftstore/test_compact_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ fn test_compact_log<T: Simulator>(cluster: &mut Cluster<T>) {
}
}

fn test_compact_limit<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.cfg.raft_store.raft_log_gc_limit = 1000;
fn test_compact_count_limit<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.cfg.raft_store.raft_log_gc_count_limit = 1000;
cluster.cfg.raft_store.raft_log_gc_threshold = 2000;
cluster.cfg.raft_store.raft_log_gc_size_limit = 20 * 1024 * 1024;
cluster.run();

cluster.must_put(b"k1", b"v1");
Expand Down Expand Up @@ -115,7 +116,6 @@ fn test_compact_limit<T: Simulator>(cluster: &mut Cluster<T>) {
assert_eq!(idx, before_state.get_index());
}

// get will use lease read so will not increase raft index
for i in 600..1200 {
let k = i.to_string().into_bytes();
let v = k.clone();
Expand Down Expand Up @@ -146,7 +146,7 @@ fn test_compact_limit<T: Simulator>(cluster: &mut Cluster<T>) {

fn test_compact_many_times<T: Simulator>(cluster: &mut Cluster<T>) {
let gc_limit: u64 = 100;
cluster.cfg.raft_store.raft_log_gc_limit = gc_limit;
cluster.cfg.raft_store.raft_log_gc_count_limit = gc_limit;
cluster.cfg.raft_store.raft_log_gc_threshold = 50;
cluster.cfg.raft_store.raft_log_gc_tick_interval = 100; // 100 ms
cluster.run();
Expand Down Expand Up @@ -211,17 +211,17 @@ fn test_server_compact_log() {
}

#[test]
fn test_node_compact_limit() {
fn test_node_compact_count_limit() {
let count = 5;
let mut cluster = new_node_cluster(0, count);
test_compact_limit(&mut cluster);
test_compact_count_limit(&mut cluster);
}

#[test]
fn test_server_compact_limit() {
fn test_server_compact_count_limit() {
let count = 5;
let mut cluster = new_server_cluster(0, count);
test_compact_limit(&mut cluster);
test_compact_count_limit(&mut cluster);
}

#[test]
Expand All @@ -237,3 +237,99 @@ fn test_server_compact_many_times() {
let mut cluster = new_server_cluster(0, count);
test_compact_many_times(&mut cluster);
}

fn test_compact_size_limit<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.cfg.raft_store.raft_log_gc_count_limit = 100000;
cluster.cfg.raft_store.raft_log_gc_size_limit = 2 * 1024 * 1024;
cluster.run();
cluster.stop_node(1);

cluster.must_put(b"k1", b"v1");

let mut before_states = HashMap::new();

for (&id, engine) in &cluster.engines {
if id == 1 {
continue;
}
must_get_equal(engine, b"k1", b"v1");
let mut state: RaftApplyState =
get_msg_cf_or_default(engine, CF_RAFT, &keys::apply_state_key(1));
let state = state.take_truncated_state();
// compact should not start
assert_eq!(RAFT_INIT_LOG_INDEX, state.get_index());
assert_eq!(RAFT_INIT_LOG_TERM, state.get_term());
before_states.insert(id, state);
}

for i in 1..600 {
let k = i.to_string().into_bytes();
let v = k.clone();
cluster.must_put(&k, &v);
let v2 = cluster.get(&k);
assert_eq!(v2, Some(v));
}

// wait log gc.
sleep_ms(500);

// limit has not reached, should not gc.
for (&id, engine) in &cluster.engines {
if id == 1 {
continue;
}
let mut state: RaftApplyState =
get_msg_cf_or_default(engine, CF_RAFT, &keys::apply_state_key(1));
let after_state = state.take_truncated_state();

let before_state = before_states.get(&id).unwrap();
let idx = after_state.get_index();
assert_eq!(idx, before_state.get_index());
}

// 600 * 10240 > 2 * 1024 * 1024
for _ in 600..1200 {
let k = vec![0; 1024 * 5];
let v = k.clone();
cluster.must_put(&k, &v);
let v2 = cluster.get(&k);
assert_eq!(v2, Some(v));
}

sleep_ms(500);

// Size exceed max limit, every peer must have compacted logs,
// so the truncate log state index/term must > than before.
for (&id, engine) in &cluster.engines {
if id == 1 {
continue;
}
let mut state: RaftApplyState =
get_msg_cf_or_default(engine, CF_RAFT, &keys::apply_state_key(1));
let after_state = state.take_truncated_state();

let before_state = before_states.get(&id).unwrap();
let idx = after_state.get_index();
assert!(idx > before_state.get_index());

let handle = get_cf_handle(engine, CF_RAFT).unwrap();
for i in 0..idx {
let key = keys::raft_log_key(1, i);
assert!(engine.get_cf(handle, &key).unwrap().is_none());
}
}
}

#[test]
fn test_node_compact_size_limit() {
let count = 5;
let mut cluster = new_node_cluster(0, count);
test_compact_size_limit(&mut cluster);
}

#[test]
fn test_server_compact_size_limit() {
let count = 5;
let mut cluster = new_server_cluster(0, count);
test_compact_size_limit(&mut cluster);
}
Loading

0 comments on commit 530d769

Please sign in to comment.