Skip to content

Commit

Permalink
raftstore: apply early (tikv#6154)
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <[email protected]>
Signed-off-by: Jay Lee <[email protected]>
  • Loading branch information
BusyJay authored and c1ay committed May 9, 2020
1 parent 497de68 commit 2b8a4ec
Show file tree
Hide file tree
Showing 16 changed files with 813 additions and 68 deletions.
3 changes: 3 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ pub struct Config {
pub future_poll_size: usize,
#[config(hidden)]
pub hibernate_regions: bool,
#[config(hidden)]
pub early_apply: bool,

// Deprecated! These two configuration has been moved to Coprocessor.
// They are preserved for compatibility check.
Expand Down Expand Up @@ -245,6 +247,7 @@ impl Default for Config {
store_pool_size: 2,
future_poll_size: 1,
hibernate_regions: true,
early_apply: true,

// They are preserved for compatibility check.
region_max_size: ReadableSize(0),
Expand Down
96 changes: 76 additions & 20 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2187,14 +2187,27 @@ pub struct Apply {
pub region_id: u64,
pub term: u64,
pub entries: Vec<Entry>,
pub last_commit_index: u64,
pub commit_index: u64,
pub commit_term: u64,
}

impl Apply {
pub fn new(region_id: u64, term: u64, entries: Vec<Entry>) -> Apply {
pub fn new(
region_id: u64,
term: u64,
entries: Vec<Entry>,
last_commit_index: u64,
commit_term: u64,
commit_index: u64,
) -> Apply {
Apply {
region_id,
term,
entries,
last_commit_index,
commit_index,
commit_term,
}
}
}
Expand Down Expand Up @@ -2496,6 +2509,27 @@ impl ApplyFsm {

self.delegate.metrics = ApplyMetrics::default();
self.delegate.term = apply.term;
let prev_state = (
self.delegate.apply_state.get_last_commit_index(),
self.delegate.apply_state.get_commit_index(),
self.delegate.apply_state.get_commit_term(),
);
let cur_state = (
apply.last_commit_index,
apply.commit_index,
apply.commit_term,
);
if prev_state.0 > cur_state.0 || prev_state.1 > cur_state.1 || prev_state.2 > cur_state.2 {
panic!(
"{} commit state jump backward {:?} -> {:?}",
self.delegate.tag, prev_state, cur_state
);
}
// The apply state may not be written to disk if entries is empty,
// which seems OK.
self.delegate.apply_state.set_last_commit_index(cur_state.0);
self.delegate.apply_state.set_commit_index(cur_state.1);
self.delegate.apply_state.set_commit_term(cur_state.2);

self.delegate
.handle_raft_committed_entries(apply_ctx, apply.entries);
Expand Down Expand Up @@ -3351,11 +3385,14 @@ mod tests {
let cc_resp = cc_rx.try_recv().unwrap();
assert!(cc_resp.get_header().get_error().has_stale_command());

router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![new_entry(2, 3, None)])));
router.schedule_task(
1,
Msg::apply(Apply::new(1, 1, vec![new_entry(2, 3, None)], 2, 2, 3)),
);
// non registered region should be ignored.
assert!(rx.recv_timeout(Duration::from_millis(100)).is_err());

router.schedule_task(2, Msg::apply(Apply::new(2, 11, vec![])));
router.schedule_task(2, Msg::apply(Apply::new(2, 11, vec![], 3, 2, 3)));
// empty entries should be ignored.
let reg_term = reg.term;
validate(&router, 2, move |delegate| {
Expand All @@ -3375,7 +3412,7 @@ mod tests {
&router,
2,
vec![
Msg::apply(Apply::new(2, 11, vec![new_entry(5, 4, None)])),
Msg::apply(Apply::new(2, 11, vec![new_entry(5, 4, None)], 3, 5, 4)),
Msg::Snapshot(GenSnapTask::new(2, 0, tx)),
],
);
Expand Down Expand Up @@ -3666,7 +3703,7 @@ mod tests {
.epoch(1, 3)
.capture_resp(&router, 3, 1, capture_tx.clone())
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![put_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![put_entry], 0, 1, 1)));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(!resp.get_header().has_error(), "{:?}", resp);
assert_eq!(resp.get_responses().len(), 3);
Expand All @@ -3686,7 +3723,7 @@ mod tests {
.put_cf(CF_LOCK, b"k1", b"v1")
.epoch(1, 3)
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 1, 2, 2)));
let apply_res = fetch_apply_res(&rx);
assert_eq!(apply_res.region_id, 1);
assert_eq!(apply_res.apply_state.get_applied_index(), 2);
Expand All @@ -3707,7 +3744,7 @@ mod tests {
.epoch(1, 1)
.capture_resp(&router, 3, 1, capture_tx.clone())
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 2, 2, 3)));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(resp.get_header().get_error().has_epoch_not_match());
let apply_res = fetch_apply_res(&rx);
Expand All @@ -3720,7 +3757,7 @@ mod tests {
.epoch(1, 3)
.capture_resp(&router, 3, 1, capture_tx.clone())
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 3, 2, 4)));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(resp.get_header().get_error().has_key_not_in_region());
let apply_res = fetch_apply_res(&rx);
Expand All @@ -3739,7 +3776,7 @@ mod tests {
.epoch(1, 3)
.capture_resp(&router, 3, 1, capture_tx.clone())
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![put_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![put_entry], 4, 3, 5)));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
// stale command should be cleared.
assert!(resp.get_header().get_error().has_stale_command());
Expand All @@ -3756,7 +3793,7 @@ mod tests {
.epoch(1, 3)
.capture_resp(&router, 3, 1, capture_tx.clone())
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![delete_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![delete_entry], 5, 3, 6)));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(resp.get_header().get_error().has_key_not_in_region());
fetch_apply_res(&rx);
Expand All @@ -3766,7 +3803,10 @@ mod tests {
.epoch(1, 3)
.capture_resp(&router, 3, 1, capture_tx.clone())
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![delete_range_entry])));
router.schedule_task(
1,
Msg::apply(Apply::new(1, 3, vec![delete_range_entry], 6, 3, 7)),
);
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(resp.get_header().get_error().has_key_not_in_region());
assert_eq!(engines.kv.get(&dk_k3).unwrap().unwrap(), b"v1");
Expand All @@ -3779,7 +3819,10 @@ mod tests {
.epoch(1, 3)
.capture_resp(&router, 3, 1, capture_tx.clone())
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![delete_range_entry])));
router.schedule_task(
1,
Msg::apply(Apply::new(1, 3, vec![delete_range_entry], 7, 3, 8)),
);
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(!resp.get_header().has_error(), "{:?}", resp);
assert!(engines.kv.get(&dk_k1).unwrap().is_none());
Expand Down Expand Up @@ -3825,7 +3868,7 @@ mod tests {
.epoch(0, 3)
.build();
let entries = vec![put_ok, ingest_ok, ingest_epoch_not_match];
router.schedule_task(1, Msg::apply(Apply::new(1, 3, entries)));
router.schedule_task(1, Msg::apply(Apply::new(1, 3, entries, 8, 3, 11)));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(!resp.get_header().has_error(), "{:?}", resp);
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand All @@ -3850,7 +3893,17 @@ mod tests {
.build();
entries.push(put_entry);
}
router.schedule_task(1, Msg::apply(Apply::new(1, 3, entries)));
router.schedule_task(
1,
Msg::apply(Apply::new(
1,
3,
entries,
11,
3,
WRITE_BATCH_MAX_KEYS as u64 + 11,
)),
);
for _ in 0..WRITE_BATCH_MAX_KEYS {
capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
}
Expand Down Expand Up @@ -3907,7 +3960,7 @@ mod tests {
.put(b"k3", b"v1")
.epoch(1, 3)
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![put_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![put_entry], 0, 1, 1)));
fetch_apply_res(&rx);
// It must receive nothing because no region registered.
cmdbatch_rx
Expand All @@ -3928,7 +3981,7 @@ mod tests {
.put(b"k0", b"v0")
.epoch(1, 3)
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 1, 2, 2)));
// Register cmd observer to region 1.
let enabled = Arc::new(AtomicBool::new(true));
router.schedule_task(
Expand Down Expand Up @@ -3956,7 +4009,7 @@ mod tests {
.epoch(1, 3)
.capture_resp(&router, 3, 1, capture_tx)
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 2, 2, 3)));
fetch_apply_res(&rx);
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(!resp.get_header().has_error(), "{:?}", resp);
Expand All @@ -3974,7 +4027,7 @@ mod tests {
.build();
router.schedule_task(
1,
Msg::apply(Apply::new(1, 2, vec![put_entry1, put_entry2])),
Msg::apply(Apply::new(1, 2, vec![put_entry1, put_entry2], 3, 2, 5)),
);
let cmd_batch = cmdbatch_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert_eq!(2, cmd_batch.len());
Expand All @@ -3985,7 +4038,7 @@ mod tests {
.put(b"k2", b"v2")
.epoch(1, 3)
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry])));
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 5, 2, 6)));
// Must not receive new cmd.
cmdbatch_rx
.recv_timeout(Duration::from_millis(100))
Expand Down Expand Up @@ -4179,7 +4232,10 @@ mod tests {
.epoch(epoch.get_conf_ver(), epoch.get_version())
.capture_resp(router, 3, 1, capture_tx.clone())
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![split])));
router.schedule_task(
1,
Msg::apply(Apply::new(1, 1, vec![split], index_id - 1, 1, index_id)),
);
index_id += 1;
capture_rx.recv_timeout(Duration::from_secs(3)).unwrap()
};
Expand Down
27 changes: 26 additions & 1 deletion components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub struct PeerFsm<E: KvEngine> {
group_state: GroupState,
stopped: bool,
has_ready: bool,
early_apply: bool,
mailbox: Option<BasicMailbox<PeerFsm<E>>>,
pub receiver: Receiver<PeerMsg<E>>,
}
Expand Down Expand Up @@ -154,6 +155,7 @@ impl<E: KvEngine> PeerFsm<E> {
Ok((
tx,
Box::new(PeerFsm {
early_apply: cfg.early_apply,
peer: Peer::new(store_id, cfg, sched, engines, region, meta_peer)?,
tick_registry: PeerTicks::empty(),
missing_ticks: 0,
Expand Down Expand Up @@ -191,6 +193,7 @@ impl<E: KvEngine> PeerFsm<E> {
Ok((
tx,
Box::new(PeerFsm {
early_apply: cfg.early_apply,
peer: Peer::new(store_id, cfg, sched, engines, &region, peer)?,
tick_registry: PeerTicks::empty(),
missing_ticks: 0,
Expand Down Expand Up @@ -602,13 +605,35 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}
}

#[inline]
pub fn handle_raft_ready_apply(&mut self, ready: &mut Ready, invoke_ctx: &InvokeContext) {
self.fsm.early_apply = ready
.committed_entries
.as_ref()
.and_then(|e| e.last())
.map_or(false, |e| {
self.fsm.peer.can_early_apply(e.get_term(), e.get_index())
});
if !self.fsm.early_apply {
return;
}
self.fsm
.peer
.handle_raft_ready_apply(self.ctx, ready, invoke_ctx);
}

pub fn post_raft_ready_append(&mut self, mut ready: Ready, invoke_ctx: InvokeContext) {
let is_merging = self.fsm.peer.pending_merge_state.is_some();
if !self.fsm.early_apply {
self.fsm
.peer
.handle_raft_ready_apply(self.ctx, &mut ready, &invoke_ctx);
}
let res = self
.fsm
.peer
.post_raft_ready_append(self.ctx, &mut ready, invoke_ctx);
self.fsm.peer.handle_raft_ready_apply(self.ctx, ready);
self.fsm.peer.handle_raft_ready_advance(ready);
let mut has_snapshot = false;
if let Some(apply_res) = res {
self.on_ready_apply_snapshot(apply_res);
Expand Down
21 changes: 21 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,22 @@ impl<T: Transport, C: PdClient> RaftPoller<T, C> {
self.poll_ctx.need_flush_trans = false;
}
let ready_cnt = self.poll_ctx.ready_res.len();
if ready_cnt != 0 && self.poll_ctx.cfg.early_apply {
let mut batch_pos = 0;
let mut ready_res = mem::replace(&mut self.poll_ctx.ready_res, vec![]);
for (ready, invoke_ctx) in &mut ready_res {
let region_id = invoke_ctx.region_id;
if peers[batch_pos].region_id() == region_id {
} else {
while peers[batch_pos].region_id() != region_id {
batch_pos += 1;
}
}
PeerFsmDelegate::new(&mut peers[batch_pos], &mut self.poll_ctx)
.handle_raft_ready_apply(ready, invoke_ctx);
}
self.poll_ctx.ready_res = ready_res;
}
self.poll_ctx.raft_metrics.ready.has_ready_region += ready_cnt as u64;
fail_point!("raft_before_save");
if !self.poll_ctx.kv_wb.is_empty() {
Expand All @@ -516,6 +532,11 @@ impl<T: Transport, C: PdClient> RaftPoller<T, C> {
}
fail_point!("raft_between_save");
if !self.poll_ctx.raft_wb.is_empty() {
fail_point!(
"raft_before_save_on_store_1",
self.poll_ctx.store_id() == 1,
|_| {}
);
let mut write_opts = WriteOptions::new();
write_opts.set_sync(self.poll_ctx.cfg.sync_log || self.poll_ctx.sync_log);
self.poll_ctx
Expand Down
6 changes: 3 additions & 3 deletions components/raftstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub use self::peer::{
Peer, PeerStat, ProposalContext, ReadExecutor, RequestInspector, RequestPolicy,
};
pub use self::peer_storage::{
clear_meta, do_snapshot, init_apply_state, init_raft_state, write_initial_apply_state,
write_initial_raft_state, write_peer_state, CacheQueryStats, PeerStorage, SnapState,
INIT_EPOCH_CONF_VER, INIT_EPOCH_VER, RAFT_INIT_LOG_INDEX, RAFT_INIT_LOG_TERM,
clear_meta, do_snapshot, write_initial_apply_state, write_initial_raft_state, write_peer_state,
CacheQueryStats, PeerStorage, SnapState, INIT_EPOCH_CONF_VER, INIT_EPOCH_VER,
RAFT_INIT_LOG_INDEX, RAFT_INIT_LOG_TERM,
};
pub use self::region_snapshot::{new_temp_engine, RegionIterator, RegionSnapshot};
pub use self::snap::{
Expand Down
Loading

0 comments on commit 2b8a4ec

Please sign in to comment.