diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index 335e797e92e..585f0fa6820 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -167,6 +167,8 @@ pub struct Config { pub future_poll_size: usize, #[config(hidden)] pub hibernate_regions: bool, + #[config(hidden)] + pub early_apply: bool, // Deprecated! These two configuration has been moved to Coprocessor. // They are preserved for compatibility check. @@ -245,6 +247,7 @@ impl Default for Config { store_pool_size: 2, future_poll_size: 1, hibernate_regions: true, + early_apply: true, // They are preserved for compatibility check. region_max_size: ReadableSize(0), diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 348e1c53867..8ba22d01311 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -2187,14 +2187,27 @@ pub struct Apply { pub region_id: u64, pub term: u64, pub entries: Vec, + pub last_commit_index: u64, + pub commit_index: u64, + pub commit_term: u64, } impl Apply { - pub fn new(region_id: u64, term: u64, entries: Vec) -> Apply { + pub fn new( + region_id: u64, + term: u64, + entries: Vec, + last_commit_index: u64, + commit_term: u64, + commit_index: u64, + ) -> Apply { Apply { region_id, term, entries, + last_commit_index, + commit_index, + commit_term, } } } @@ -2496,6 +2509,27 @@ impl ApplyFsm { self.delegate.metrics = ApplyMetrics::default(); self.delegate.term = apply.term; + let prev_state = ( + self.delegate.apply_state.get_last_commit_index(), + self.delegate.apply_state.get_commit_index(), + self.delegate.apply_state.get_commit_term(), + ); + let cur_state = ( + apply.last_commit_index, + apply.commit_index, + apply.commit_term, + ); + if prev_state.0 > cur_state.0 || prev_state.1 > cur_state.1 || prev_state.2 > cur_state.2 { + panic!( + "{} commit state jump backward {:?} -> {:?}", + self.delegate.tag, prev_state, cur_state + ); + } + // The apply state may not be written to disk if entries is empty, + // which seems OK. + self.delegate.apply_state.set_last_commit_index(cur_state.0); + self.delegate.apply_state.set_commit_index(cur_state.1); + self.delegate.apply_state.set_commit_term(cur_state.2); self.delegate .handle_raft_committed_entries(apply_ctx, apply.entries); @@ -3351,11 +3385,14 @@ mod tests { let cc_resp = cc_rx.try_recv().unwrap(); assert!(cc_resp.get_header().get_error().has_stale_command()); - router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![new_entry(2, 3, None)]))); + router.schedule_task( + 1, + Msg::apply(Apply::new(1, 1, vec![new_entry(2, 3, None)], 2, 2, 3)), + ); // non registered region should be ignored. assert!(rx.recv_timeout(Duration::from_millis(100)).is_err()); - router.schedule_task(2, Msg::apply(Apply::new(2, 11, vec![]))); + router.schedule_task(2, Msg::apply(Apply::new(2, 11, vec![], 3, 2, 3))); // empty entries should be ignored. let reg_term = reg.term; validate(&router, 2, move |delegate| { @@ -3375,7 +3412,7 @@ mod tests { &router, 2, vec![ - Msg::apply(Apply::new(2, 11, vec![new_entry(5, 4, None)])), + Msg::apply(Apply::new(2, 11, vec![new_entry(5, 4, None)], 3, 5, 4)), Msg::Snapshot(GenSnapTask::new(2, 0, tx)), ], ); @@ -3666,7 +3703,7 @@ mod tests { .epoch(1, 3) .capture_resp(&router, 3, 1, capture_tx.clone()) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![put_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![put_entry], 0, 1, 1))); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert!(!resp.get_header().has_error(), "{:?}", resp); assert_eq!(resp.get_responses().len(), 3); @@ -3686,7 +3723,7 @@ mod tests { .put_cf(CF_LOCK, b"k1", b"v1") .epoch(1, 3) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 1, 2, 2))); let apply_res = fetch_apply_res(&rx); assert_eq!(apply_res.region_id, 1); assert_eq!(apply_res.apply_state.get_applied_index(), 2); @@ -3707,7 +3744,7 @@ mod tests { .epoch(1, 1) .capture_resp(&router, 3, 1, capture_tx.clone()) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 2, 2, 3))); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert!(resp.get_header().get_error().has_epoch_not_match()); let apply_res = fetch_apply_res(&rx); @@ -3720,7 +3757,7 @@ mod tests { .epoch(1, 3) .capture_resp(&router, 3, 1, capture_tx.clone()) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 3, 2, 4))); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert!(resp.get_header().get_error().has_key_not_in_region()); let apply_res = fetch_apply_res(&rx); @@ -3739,7 +3776,7 @@ mod tests { .epoch(1, 3) .capture_resp(&router, 3, 1, capture_tx.clone()) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![put_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![put_entry], 4, 3, 5))); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); // stale command should be cleared. assert!(resp.get_header().get_error().has_stale_command()); @@ -3756,7 +3793,7 @@ mod tests { .epoch(1, 3) .capture_resp(&router, 3, 1, capture_tx.clone()) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![delete_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![delete_entry], 5, 3, 6))); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert!(resp.get_header().get_error().has_key_not_in_region()); fetch_apply_res(&rx); @@ -3766,7 +3803,10 @@ mod tests { .epoch(1, 3) .capture_resp(&router, 3, 1, capture_tx.clone()) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![delete_range_entry]))); + router.schedule_task( + 1, + Msg::apply(Apply::new(1, 3, vec![delete_range_entry], 6, 3, 7)), + ); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert!(resp.get_header().get_error().has_key_not_in_region()); assert_eq!(engines.kv.get(&dk_k3).unwrap().unwrap(), b"v1"); @@ -3779,7 +3819,10 @@ mod tests { .epoch(1, 3) .capture_resp(&router, 3, 1, capture_tx.clone()) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 3, vec![delete_range_entry]))); + router.schedule_task( + 1, + Msg::apply(Apply::new(1, 3, vec![delete_range_entry], 7, 3, 8)), + ); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert!(!resp.get_header().has_error(), "{:?}", resp); assert!(engines.kv.get(&dk_k1).unwrap().is_none()); @@ -3825,7 +3868,7 @@ mod tests { .epoch(0, 3) .build(); let entries = vec![put_ok, ingest_ok, ingest_epoch_not_match]; - router.schedule_task(1, Msg::apply(Apply::new(1, 3, entries))); + router.schedule_task(1, Msg::apply(Apply::new(1, 3, entries, 8, 3, 11))); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert!(!resp.get_header().has_error(), "{:?}", resp); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); @@ -3850,7 +3893,17 @@ mod tests { .build(); entries.push(put_entry); } - router.schedule_task(1, Msg::apply(Apply::new(1, 3, entries))); + router.schedule_task( + 1, + Msg::apply(Apply::new( + 1, + 3, + entries, + 11, + 3, + WRITE_BATCH_MAX_KEYS as u64 + 11, + )), + ); for _ in 0..WRITE_BATCH_MAX_KEYS { capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); } @@ -3907,7 +3960,7 @@ mod tests { .put(b"k3", b"v1") .epoch(1, 3) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![put_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![put_entry], 0, 1, 1))); fetch_apply_res(&rx); // It must receive nothing because no region registered. cmdbatch_rx @@ -3928,7 +3981,7 @@ mod tests { .put(b"k0", b"v0") .epoch(1, 3) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 1, 2, 2))); // Register cmd observer to region 1. let enabled = Arc::new(AtomicBool::new(true)); router.schedule_task( @@ -3956,7 +4009,7 @@ mod tests { .epoch(1, 3) .capture_resp(&router, 3, 1, capture_tx) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 2, 2, 3))); fetch_apply_res(&rx); let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert!(!resp.get_header().has_error(), "{:?}", resp); @@ -3974,7 +4027,7 @@ mod tests { .build(); router.schedule_task( 1, - Msg::apply(Apply::new(1, 2, vec![put_entry1, put_entry2])), + Msg::apply(Apply::new(1, 2, vec![put_entry1, put_entry2], 3, 2, 5)), ); let cmd_batch = cmdbatch_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert_eq!(2, cmd_batch.len()); @@ -3985,7 +4038,7 @@ mod tests { .put(b"k2", b"v2") .epoch(1, 3) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry]))); + router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry], 5, 2, 6))); // Must not receive new cmd. cmdbatch_rx .recv_timeout(Duration::from_millis(100)) @@ -4179,7 +4232,10 @@ mod tests { .epoch(epoch.get_conf_ver(), epoch.get_version()) .capture_resp(router, 3, 1, capture_tx.clone()) .build(); - router.schedule_task(1, Msg::apply(Apply::new(1, 1, vec![split]))); + router.schedule_task( + 1, + Msg::apply(Apply::new(1, 1, vec![split], index_id - 1, 1, index_id)), + ); index_id += 1; capture_rx.recv_timeout(Duration::from_secs(3)).unwrap() }; diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 6ec91cb9a40..90077e741fc 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -97,6 +97,7 @@ pub struct PeerFsm { group_state: GroupState, stopped: bool, has_ready: bool, + early_apply: bool, mailbox: Option>>, pub receiver: Receiver>, } @@ -154,6 +155,7 @@ impl PeerFsm { Ok(( tx, Box::new(PeerFsm { + early_apply: cfg.early_apply, peer: Peer::new(store_id, cfg, sched, engines, region, meta_peer)?, tick_registry: PeerTicks::empty(), missing_ticks: 0, @@ -191,6 +193,7 @@ impl PeerFsm { Ok(( tx, Box::new(PeerFsm { + early_apply: cfg.early_apply, peer: Peer::new(store_id, cfg, sched, engines, ®ion, peer)?, tick_registry: PeerTicks::empty(), missing_ticks: 0, @@ -602,13 +605,35 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } } + #[inline] + pub fn handle_raft_ready_apply(&mut self, ready: &mut Ready, invoke_ctx: &InvokeContext) { + self.fsm.early_apply = ready + .committed_entries + .as_ref() + .and_then(|e| e.last()) + .map_or(false, |e| { + self.fsm.peer.can_early_apply(e.get_term(), e.get_index()) + }); + if !self.fsm.early_apply { + return; + } + self.fsm + .peer + .handle_raft_ready_apply(self.ctx, ready, invoke_ctx); + } + pub fn post_raft_ready_append(&mut self, mut ready: Ready, invoke_ctx: InvokeContext) { let is_merging = self.fsm.peer.pending_merge_state.is_some(); + if !self.fsm.early_apply { + self.fsm + .peer + .handle_raft_ready_apply(self.ctx, &mut ready, &invoke_ctx); + } let res = self .fsm .peer .post_raft_ready_append(self.ctx, &mut ready, invoke_ctx); - self.fsm.peer.handle_raft_ready_apply(self.ctx, ready); + self.fsm.peer.handle_raft_ready_advance(ready); let mut has_snapshot = false; if let Some(apply_res) = res { self.on_ready_apply_snapshot(apply_res); diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 049acd96897..ebc82005ea9 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -494,6 +494,22 @@ impl RaftPoller { self.poll_ctx.need_flush_trans = false; } let ready_cnt = self.poll_ctx.ready_res.len(); + if ready_cnt != 0 && self.poll_ctx.cfg.early_apply { + let mut batch_pos = 0; + let mut ready_res = mem::replace(&mut self.poll_ctx.ready_res, vec![]); + for (ready, invoke_ctx) in &mut ready_res { + let region_id = invoke_ctx.region_id; + if peers[batch_pos].region_id() == region_id { + } else { + while peers[batch_pos].region_id() != region_id { + batch_pos += 1; + } + } + PeerFsmDelegate::new(&mut peers[batch_pos], &mut self.poll_ctx) + .handle_raft_ready_apply(ready, invoke_ctx); + } + self.poll_ctx.ready_res = ready_res; + } self.poll_ctx.raft_metrics.ready.has_ready_region += ready_cnt as u64; fail_point!("raft_before_save"); if !self.poll_ctx.kv_wb.is_empty() { @@ -516,6 +532,11 @@ impl RaftPoller { } fail_point!("raft_between_save"); if !self.poll_ctx.raft_wb.is_empty() { + fail_point!( + "raft_before_save_on_store_1", + self.poll_ctx.store_id() == 1, + |_| {} + ); let mut write_opts = WriteOptions::new(); write_opts.set_sync(self.poll_ctx.cfg.sync_log || self.poll_ctx.sync_log); self.poll_ctx diff --git a/components/raftstore/src/store/mod.rs b/components/raftstore/src/store/mod.rs index 913d9b6ebc3..0dc1b761945 100644 --- a/components/raftstore/src/store/mod.rs +++ b/components/raftstore/src/store/mod.rs @@ -31,9 +31,9 @@ pub use self::peer::{ Peer, PeerStat, ProposalContext, ReadExecutor, RequestInspector, RequestPolicy, }; pub use self::peer_storage::{ - clear_meta, do_snapshot, init_apply_state, init_raft_state, write_initial_apply_state, - write_initial_raft_state, write_peer_state, CacheQueryStats, PeerStorage, SnapState, - INIT_EPOCH_CONF_VER, INIT_EPOCH_VER, RAFT_INIT_LOG_INDEX, RAFT_INIT_LOG_TERM, + clear_meta, do_snapshot, write_initial_apply_state, write_initial_raft_state, write_peer_state, + CacheQueryStats, PeerStorage, SnapState, INIT_EPOCH_CONF_VER, INIT_EPOCH_VER, + RAFT_INIT_LOG_INDEX, RAFT_INIT_LOG_TERM, }; pub use self::region_snapshot::{new_temp_engine, RegionIterator, RegionSnapshot}; pub use self::snap::{ diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 4e8ae9ea28f..4542ce94fd1 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1064,6 +1064,23 @@ impl Peer { false } + /// Whether a log can be applied before writing raft batch. + /// + /// If TiKV crashes, it's possible apply index > commit index. If logs are still + /// available in other nodes, it's possible to be recovered. But for singleton, logs are + /// only available on single node, logs are gone forever. + /// + /// Note we can't just check singleton. Because conf change takes effect on apply, so even + /// there are two nodes, previous logs can still be committed by leader alone. Those logs + /// can't be applied early. After introducing joint consensus, the node number can be + /// undetermined. So here check whether log is persisted on disk instead. + /// + /// Only apply existing logs has another benefit that we don't need to deal with snapshots + /// that are older than apply index as apply index <= last index <= index of snapshot. + pub fn can_early_apply(&self, term: u64, index: u64) -> bool { + self.get_store().last_index() >= index && self.get_store().last_term() >= term + } + pub fn take_apply_proposals(&mut self) -> Option { if self.apply_proposals.is_empty() { return None; @@ -1277,7 +1294,12 @@ impl Peer { apply_snap_result } - pub fn handle_raft_ready_apply(&mut self, ctx: &mut PollContext, mut ready: Ready) { + pub fn handle_raft_ready_apply( + &mut self, + ctx: &mut PollContext, + ready: &mut Ready, + invoke_ctx: &InvokeContext, + ) { // Call `handle_raft_committed_entries` directly here may lead to inconsistency. // In some cases, there will be some pending committed entries when applying a // snapshot. If we call `handle_raft_committed_entries` directly, these updates @@ -1285,10 +1307,8 @@ impl Peer { // updates will soon be removed. But the soft state of raft is still be updated // in memory. Hence when handle ready next time, these updates won't be included // in `ready.committed_entries` again, which will lead to inconsistency. - if self.is_applying_snapshot() { - // Snapshot's metadata has been applied. - self.last_applying_idx = self.get_store().truncated_index(); - } else { + if raft::is_empty_snap(ready.snapshot()) { + debug_assert!(!invoke_ctx.has_snapshot() && !self.get_store().is_applying_snapshot()); let committed_entries = ready.committed_entries.take().unwrap(); // leader needs to update lease and last committed split index. let mut lease_to_be_updated = self.is_leader(); @@ -1365,7 +1385,16 @@ impl Peer { self.raft_group.skip_bcast_commit(true); self.last_urgent_proposal_idx = u64::MAX; } - let apply = Apply::new(self.region_id, self.term(), committed_entries); + let committed_index = self.raft_group.raft.raft_log.committed; + let term = self.raft_group.raft.raft_log.term(committed_index).unwrap(); + let apply = Apply::new( + self.region_id, + self.term(), + committed_entries, + self.get_store().committed_index(), + term, + committed_index, + ); ctx.apply_router .schedule_task(self.region_id, ApplyTask::apply(apply)); } @@ -1382,13 +1411,20 @@ impl Peer { } } - self.apply_reads(ctx, &ready); + self.apply_reads(ctx, ready); + } - self.raft_group.advance_append(ready); - if self.is_applying_snapshot() { + pub fn handle_raft_ready_advance(&mut self, ready: Ready) { + if !raft::is_empty_snap(ready.snapshot()) { + debug_assert!(self.get_store().is_applying_snapshot()); + // Snapshot's metadata has been applied. + self.last_applying_idx = self.get_store().truncated_index(); + self.raft_group.advance_append(ready); // Because we only handle raft ready when not applying snapshot, so following // line won't be called twice for the same snapshot. self.raft_group.advance_apply(self.last_applying_idx); + } else { + self.raft_group.advance_append(ready); } self.proposals.gc(); } diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 8c6f35906ca..57a7a00250d 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -357,7 +357,7 @@ pub fn recover_from_applying_state( Ok(()) } -pub fn init_raft_state(engines: &Engines, region: &Region) -> Result { +fn init_raft_state(engines: &Engines, region: &Region) -> Result { let state_key = keys::raft_state_key(region.get_id()); Ok(match engines.raft.get_msg(&state_key)? { Some(s) => s, @@ -375,7 +375,7 @@ pub fn init_raft_state(engines: &Engines, region: &Region) -> Result Result { +fn init_apply_state(engines: &Engines, region: &Region) -> Result { Ok( match engines .kv @@ -396,6 +396,54 @@ pub fn init_apply_state(engines: &Engines, region: &Region) -> Result Result<()> { + let last_index = raft_state.get_last_index(); + let mut commit_index = raft_state.get_hard_state().get_commit(); + let apply_index = apply_state.get_applied_index(); + + if commit_index < apply_state.get_last_commit_index() { + return Err(box_err!( + "raft state {:?} not match apply state {:?} and can't be recovered.", + raft_state, + apply_state + )); + } + let recorded_commit_index = apply_state.get_commit_index(); + if commit_index < recorded_commit_index { + let log_key = keys::raft_log_key(region_id, recorded_commit_index); + let entry = engines.raft.get_msg::(&log_key)?; + if entry.map_or(true, |e| e.get_term() != apply_state.get_commit_term()) { + return Err(box_err!( + "log at recorded commit index [{}] {} doesn't exist, may lose data", + apply_state.get_commit_term(), + recorded_commit_index + )); + } + info!("updating commit index"; "region_id" => region_id, "old" => commit_index, "new" => recorded_commit_index); + commit_index = recorded_commit_index; + } + + if commit_index > last_index || apply_index > commit_index { + return Err(box_err!( + "raft state {:?} not match apply state {:?} and can't be recovered,", + raft_state, + apply_state + )); + } + + raft_state.mut_hard_state().set_commit(commit_index); + if raft_state.get_hard_state().get_term() < apply_state.get_commit_term() { + return Err(box_err!("raft state {:?} corrupted", raft_state)); + } + + Ok(()) +} + fn init_last_term( engines: &Engines, region: &Region, @@ -490,15 +538,10 @@ impl PeerStorage { "peer_id" => peer_id, "path" => ?engines.kv.path(), ); - let raft_state = init_raft_state(&engines, region)?; + let mut raft_state = init_raft_state(&engines, region)?; let apply_state = init_apply_state(&engines, region)?; - if raft_state.get_last_index() < apply_state.get_applied_index() { - panic!( - "{} unexpected raft log index: last_index {} < applied_index {}", - tag, - raft_state.get_last_index(), - apply_state.get_applied_index() - ); + if let Err(e) = validate_states(region.get_id(), &engines, &mut raft_state, &apply_state) { + return Err(box_err!("{} validate state fail: {:?}", tag, e)); } let last_term = init_last_term(&engines, region, &raft_state, &apply_state)?; @@ -630,6 +673,11 @@ impl PeerStorage { last_index(&self.raft_state) } + #[inline] + pub fn last_term(&self) -> u64 { + self.last_term + } + #[inline] pub fn applied_index(&self) -> u64 { self.apply_state.get_applied_index() @@ -2319,4 +2367,136 @@ mod tests { assert_eq!(get_sync_log_from_entry(&e), sync, "{:?}", e); } } + + #[test] + fn test_validate_states() { + let td = Builder::new().prefix("tikv-store-test").tempdir().unwrap(); + let worker = Worker::new("snap-manager"); + let sched = worker.scheduler(); + let kv_db = Arc::new(new_engine(td.path().to_str().unwrap(), None, ALL_CFS, None).unwrap()); + let raft_path = td.path().join(Path::new("raft")); + let raft_db = + Arc::new(new_engine(raft_path.to_str().unwrap(), None, &[CF_DEFAULT], None).unwrap()); + let shared_block_cache = false; + let engines = Engines::new(kv_db, raft_db, shared_block_cache); + bootstrap_store(&engines, 1, 1).unwrap(); + + let region = initial_region(1, 1, 1); + prepare_bootstrap_cluster(&engines, ®ion).unwrap(); + let build_storage = || -> Result { + PeerStorage::new(engines.clone(), ®ion, sched.clone(), 0, "".to_owned()) + }; + let mut s = build_storage().unwrap(); + let mut raft_state = RaftLocalState::default(); + raft_state.set_last_index(RAFT_INIT_LOG_INDEX); + raft_state.mut_hard_state().set_term(RAFT_INIT_LOG_TERM); + raft_state.mut_hard_state().set_commit(RAFT_INIT_LOG_INDEX); + let initial_state = s.initial_state().unwrap(); + assert_eq!(initial_state.hard_state, *raft_state.get_hard_state()); + + // last_index < commit_index is invalid. + let raft_state_key = keys::raft_state_key(1); + raft_state.set_last_index(11); + let log_key = keys::raft_log_key(1, 11); + engines + .raft + .put_msg(&log_key, &new_entry(11, RAFT_INIT_LOG_TERM)) + .unwrap(); + raft_state.mut_hard_state().set_commit(12); + engines.raft.put_msg(&raft_state_key, &raft_state).unwrap(); + assert!(build_storage().is_err()); + + let log_key = keys::raft_log_key(1, 20); + engines + .raft + .put_msg(&log_key, &new_entry(20, RAFT_INIT_LOG_TERM)) + .unwrap(); + raft_state.set_last_index(20); + engines.raft.put_msg(&raft_state_key, &raft_state).unwrap(); + s = build_storage().unwrap(); + let initial_state = s.initial_state().unwrap(); + assert_eq!(initial_state.hard_state, *raft_state.get_hard_state()); + + // Missing last log is invalid. + engines.raft.del(&log_key).unwrap(); + assert!(build_storage().is_err()); + engines + .raft + .put_msg(&log_key, &new_entry(20, RAFT_INIT_LOG_TERM)) + .unwrap(); + + // applied_index > commit_index is invalid. + let mut apply_state = RaftApplyState::default(); + apply_state.set_applied_index(13); + apply_state + .mut_truncated_state() + .set_index(RAFT_INIT_LOG_INDEX); + apply_state + .mut_truncated_state() + .set_term(RAFT_INIT_LOG_TERM); + let apply_state_key = keys::apply_state_key(1); + let cf_raft = engines.kv.cf_handle(CF_RAFT).unwrap(); + engines + .kv + .put_msg_cf(cf_raft, &apply_state_key, &apply_state) + .unwrap(); + assert!(build_storage().is_err()); + + // It should not recover if corresponding log doesn't exist. + apply_state.set_commit_index(14); + apply_state.set_commit_term(RAFT_INIT_LOG_TERM); + engines + .kv + .put_msg_cf(cf_raft, &apply_state_key, &apply_state) + .unwrap(); + assert!(build_storage().is_err()); + + let log_key = keys::raft_log_key(1, 14); + engines + .raft + .put_msg(&log_key, &new_entry(14, RAFT_INIT_LOG_TERM)) + .unwrap(); + raft_state.mut_hard_state().set_commit(14); + s = build_storage().unwrap(); + let initial_state = s.initial_state().unwrap(); + assert_eq!(initial_state.hard_state, *raft_state.get_hard_state()); + + // log term miss match is invalid. + engines + .raft + .put_msg(&log_key, &new_entry(14, RAFT_INIT_LOG_TERM - 1)) + .unwrap(); + assert!(build_storage().is_err()); + + // hard state term miss match is invalid. + engines + .raft + .put_msg(&log_key, &new_entry(14, RAFT_INIT_LOG_TERM)) + .unwrap(); + raft_state.mut_hard_state().set_term(RAFT_INIT_LOG_TERM - 1); + engines.raft.put_msg(&raft_state_key, &raft_state).unwrap(); + assert!(build_storage().is_err()); + + // last index < recorded_commit_index is invalid. + raft_state.mut_hard_state().set_term(RAFT_INIT_LOG_TERM); + raft_state.set_last_index(13); + let log_key = keys::raft_log_key(1, 13); + engines + .raft + .put_msg(&log_key, &new_entry(13, RAFT_INIT_LOG_TERM)) + .unwrap(); + engines.raft.put_msg(&raft_state_key, &raft_state).unwrap(); + assert!(build_storage().is_err()); + + // last_commit_index > commit_index is invalid. + raft_state.set_last_index(20); + raft_state.mut_hard_state().set_commit(12); + engines.raft.put_msg(&raft_state_key, &raft_state).unwrap(); + apply_state.set_last_commit_index(13); + engines + .kv + .put_msg_cf(cf_raft, &apply_state_key, &apply_state) + .unwrap(); + assert!(build_storage().is_err()); + } } diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs index 7e0b7050506..b4e35f31ef0 100644 --- a/components/test_raftstore/src/cluster.rs +++ b/components/test_raftstore/src/cluster.rs @@ -10,15 +10,14 @@ use kvproto::errorpb::Error as PbError; use kvproto::metapb::{self, Peer, RegionEpoch}; use kvproto::pdpb; use kvproto::raft_cmdpb::*; -use kvproto::raft_serverpb::{RaftApplyState, RaftMessage, RaftTruncatedState}; +use kvproto::raft_serverpb::{self, RaftApplyState, RaftMessage, RaftTruncatedState}; +use raft::eraftpb::ConfChangeType; use tempfile::{Builder, TempDir}; use engine::rocks; -use engine::rocks::DB; -use engine::Engines; -use engine::Peekable; -use engine_rocks::RocksEngine; -use engine_traits::CF_DEFAULT; +use engine::{Engines, Peekable, DB}; +use engine_rocks::{Compat, RocksEngine, RocksSnapshot}; +use engine_traits::{Iterable, Mutable, WriteBatchExt, CF_DEFAULT, CF_RAFT}; use pd_client::PdClient; use raftstore::store::fsm::{create_raft_batch_system, PeerFsm, RaftBatchSystem, RaftRouter}; use raftstore::store::transport::CasualRouter; @@ -688,6 +687,63 @@ impl Cluster { } } + pub fn async_request( + &mut self, + mut req: RaftCmdRequest, + ) -> Result> { + let region_id = req.get_header().get_region_id(); + let leader = self.leader_of_region(region_id).unwrap(); + req.mut_header().set_peer(leader.clone()); + let (cb, rx) = make_cb(&req); + self.sim + .rl() + .async_command_on_node(leader.get_store_id(), req, cb)?; + Ok(rx) + } + + pub fn async_put( + &mut self, + key: &[u8], + value: &[u8], + ) -> Result> { + let mut region = self.get_region(key); + let reqs = vec![new_put_cmd(key, value)]; + let put = new_request(region.get_id(), region.take_region_epoch(), reqs, false); + self.async_request(put) + } + + pub fn async_remove_peer( + &mut self, + region_id: u64, + peer: metapb::Peer, + ) -> Result> { + let region = self + .pd_client + .get_region_by_id(region_id) + .wait() + .unwrap() + .unwrap(); + let remove_peer = new_change_peer_request(ConfChangeType::RemoveNode, peer); + let req = new_admin_request(region_id, region.get_region_epoch(), remove_peer); + self.async_request(req) + } + + pub fn async_add_peer( + &mut self, + region_id: u64, + peer: metapb::Peer, + ) -> Result> { + let region = self + .pd_client + .get_region_by_id(region_id) + .wait() + .unwrap() + .unwrap(); + let add_peer = new_change_peer_request(ConfChangeType::AddNode, peer); + let req = new_admin_request(region_id, region.get_region_epoch(), add_peer); + self.async_request(req) + } + pub fn must_put(&mut self, key: &[u8], value: &[u8]) { self.must_put_cf("default", key, value); } @@ -795,11 +851,106 @@ impl Cluster { } pub fn truncated_state(&self, region_id: u64, store_id: u64) -> RaftTruncatedState { + self.apply_state(region_id, store_id).take_truncated_state() + } + + pub fn apply_state(&self, region_id: u64, store_id: u64) -> RaftApplyState { + let key = keys::apply_state_key(region_id); self.get_engine(store_id) - .get_msg_cf::(engine_traits::CF_RAFT, &keys::apply_state_key(region_id)) + .get_msg_cf::(engine_traits::CF_RAFT, &key) .unwrap() .unwrap() - .take_truncated_state() + } + + pub fn raft_local_state(&self, region_id: u64, store_id: u64) -> raft_serverpb::RaftLocalState { + let key = keys::raft_state_key(region_id); + self.get_raft_engine(store_id) + .get_msg::(&key) + .unwrap() + .unwrap() + } + + pub fn wait_last_index( + &mut self, + region_id: u64, + store_id: u64, + expected: u64, + timeout: Duration, + ) { + let timer = Instant::now(); + loop { + let raft_state = self.raft_local_state(region_id, store_id); + let cur_index = raft_state.get_last_index(); + if cur_index >= expected { + return; + } + if timer.elapsed() >= timeout { + panic!( + "[region {}] last index still not reach {}: {:?}", + region_id, expected, raft_state + ); + } + thread::sleep(Duration::from_millis(10)); + } + } + + pub fn restore_kv_meta(&self, region_id: u64, store_id: u64, snap: &RocksSnapshot) { + let (meta_start, meta_end) = ( + keys::region_meta_prefix(region_id), + keys::region_meta_prefix(region_id + 1), + ); + let mut kv_wb = self.engines[&store_id].kv.c().write_batch(); + RocksEngine::from_ref(&self.engines[&store_id].kv) + .scan_cf(CF_RAFT, &meta_start, &meta_end, false, |k, _| { + kv_wb.delete(k).unwrap(); + Ok(true) + }) + .unwrap(); + snap.scan_cf(CF_RAFT, &meta_start, &meta_end, false, |k, v| { + kv_wb.put(k, v).unwrap(); + Ok(true) + }) + .unwrap(); + + let (raft_start, raft_end) = ( + keys::region_raft_prefix(region_id), + keys::region_raft_prefix(region_id + 1), + ); + RocksEngine::from_ref(&self.engines[&store_id].kv) + .scan_cf(CF_RAFT, &raft_start, &raft_end, false, |k, _| { + kv_wb.delete(k).unwrap(); + Ok(true) + }) + .unwrap(); + snap.scan_cf(CF_RAFT, &raft_start, &raft_end, false, |k, v| { + kv_wb.put(k, v).unwrap(); + Ok(true) + }) + .unwrap(); + self.engines[&store_id].kv.write(kv_wb.as_inner()).unwrap(); + } + + pub fn restore_raft(&self, region_id: u64, store_id: u64, snap: &RocksSnapshot) { + let (raft_start, raft_end) = ( + keys::region_raft_prefix(region_id), + keys::region_raft_prefix(region_id + 1), + ); + let mut raft_wb = self.engines[&store_id].raft.c().write_batch(); + RocksEngine::from_ref(&self.engines[&store_id].raft) + .scan(&raft_start, &raft_end, false, |k, _| { + raft_wb.delete(k).unwrap(); + Ok(true) + }) + .unwrap(); + snap.scan(&raft_start, &raft_end, false, |k, v| { + raft_wb.put(k, v).unwrap(); + Ok(true) + }) + .unwrap(); + self.engines[&store_id] + .raft + .write(raft_wb.as_inner()) + .unwrap(); } pub fn add_send_filter(&self, factory: F) { diff --git a/src/server/debug.rs b/src/server/debug.rs index 3fc27c8b478..0df6f83f8e1 100644 --- a/src/server/debug.rs +++ b/src/server/debug.rs @@ -37,10 +37,7 @@ use raftstore::coprocessor::properties::MvccProperties; use raftstore::coprocessor::{get_region_approximate_keys_cf, get_region_approximate_middle}; use raftstore::store::util as raftstore_util; use raftstore::store::PeerStorage; -use raftstore::store::{ - init_apply_state, init_raft_state, write_initial_apply_state, write_initial_raft_state, - write_peer_state, -}; +use raftstore::store::{write_initial_apply_state, write_initial_raft_state, write_peer_state}; use tikv_util::codec::bytes; use tikv_util::collections::HashSet; use tikv_util::config::ReadableSize; @@ -504,18 +501,6 @@ impl Debugger { Error::Other("RegionLocalState doesn't contains peer itself".into()) })?; - let raft_state = box_try!(init_raft_state(&self.engines, region)); - let apply_state = box_try!(init_apply_state(&self.engines, region)); - if raft_state.get_last_index() < apply_state.get_applied_index() { - return Err(Error::Other("last index < applied index".into())); - } - if raft_state.get_hard_state().get_commit() < apply_state.get_applied_index() { - return Err(Error::Other("commit index < applied index".into())); - } - if raft_state.get_last_index() < raft_state.get_hard_state().get_commit() { - return Err(Error::Other("last index < commit index".into())); - } - let tag = format!("[region {}] {}", region.get_id(), peer_id); let peer_storage = box_try!(PeerStorage::new( self.engines.clone(), @@ -531,7 +516,6 @@ impl Debugger { heartbeat_tick: 2, max_size_per_msg: ReadableSize::mb(1).0, max_inflight_msgs: 256, - applied: apply_state.get_applied_index(), check_quorum: true, skip_bcast_commit: true, ..Default::default() diff --git a/tests/failpoints/cases/mod.rs b/tests/failpoints/cases/mod.rs index 40e07954093..c85a42f7722 100644 --- a/tests/failpoints/cases/mod.rs +++ b/tests/failpoints/cases/mod.rs @@ -3,6 +3,7 @@ mod test_bootstrap; mod test_conf_change; mod test_coprocessor; +mod test_early_apply; mod test_gc_worker; mod test_merge; mod test_pending_peers; diff --git a/tests/failpoints/cases/test_early_apply.rs b/tests/failpoints/cases/test_early_apply.rs new file mode 100644 index 00000000000..dcd9511fd83 --- /dev/null +++ b/tests/failpoints/cases/test_early_apply.rs @@ -0,0 +1,92 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +use raft::eraftpb::MessageType; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::*; +use test_raftstore::*; + +/// Tests early apply is disabled for singleton. +#[test] +fn test_singleton_early_apply() { + let mut cluster = new_node_cluster(0, 3); + cluster.cfg.raft_store.early_apply = true; + cluster.cfg.raft_store.store_pool_size = 1; + cluster.pd_client.disable_default_operator(); + // So compact log will not be triggered automatically. + configure_for_request_snapshot(&mut cluster); + cluster.run_conf_change(); + // Put one key first to cache leader. + cluster.must_put(b"k0", b"v0"); + + let store_1_fp = "raft_before_save_on_store_1"; + + // Check singleton region can be scheduled correctly. + fail::cfg(store_1_fp, "pause").unwrap(); + cluster.async_put(b"k1", b"v1").unwrap(); + thread::sleep(Duration::from_millis(100)); + must_get_none(&cluster.get_engine(1), b"k1"); + fail::remove(store_1_fp); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + + // Check mixed regions can be scheduled correctly. + let r1 = cluster.get_region(b"k1"); + cluster.must_split(&r1, b"k2"); + cluster.pd_client.must_add_peer(r1.get_id(), new_peer(2, 2)); + cluster.get_region(b"k3"); + // Put key value to cache leader. + cluster.must_put(b"k0", b"v0"); + cluster.must_put(b"k3", b"v3"); + must_get_equal(&cluster.get_engine(1), b"k0", b"v0"); + must_get_equal(&cluster.get_engine(1), b"k3", b"v3"); + let executed = AtomicBool::new(false); + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 1) + .direction(Direction::Send) + .msg_type(MessageType::MsgAppend) + // Just for callback, so never filter. + .when(Arc::new(AtomicBool::new(false))) + .set_msg_callback(Arc::new(move |_| { + if !executed.swap(true, Ordering::SeqCst) { + info!("hook pause"); + fail::cfg(store_1_fp, "pause").unwrap(); + } + })), + )); + cluster.async_put(b"k4", b"v4").unwrap(); + // Sleep a while so that leader receives follower's response and commit log. + thread::sleep(Duration::from_millis(100)); + cluster.async_put(b"k11", b"v22").unwrap(); + fail::cfg(store_1_fp, "pause").unwrap(); + must_get_equal(&cluster.get_engine(1), b"k4", b"v4"); + must_get_none(&cluster.get_engine(1), b"k11"); + fail::remove(store_1_fp); + must_get_equal(&cluster.get_engine(1), b"k11", b"v22"); +} + +/// Tests whether disabling early apply really works. +#[test] +fn test_disable_early_apply() { + let mut cluster = new_node_cluster(0, 3); + cluster.cfg.raft_store.early_apply = false; + // So compact log will not be triggered automatically. + configure_for_request_snapshot(&mut cluster); + cluster.run(); + cluster.must_transfer_leader(1, new_peer(1, 1)); + + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + + let filter = RegionPacketFilter::new(1, 1) + .msg_type(MessageType::MsgAppendResponse) + .direction(Direction::Recv); + cluster.add_send_filter(CloneFilterFactory(filter)); + let last_index = cluster.raft_local_state(1, 1).get_last_index(); + cluster.async_put(b"k2", b"v2").unwrap(); + cluster.wait_last_index(1, 1, last_index + 1, Duration::from_secs(3)); + fail::cfg("raft_before_save_on_store_1", "pause").unwrap(); + cluster.clear_send_filters(); + must_get_equal(&cluster.get_engine(2), b"k2", b"v2"); + must_get_none(&cluster.get_engine(1), b"k2"); +} diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index f464231aa7a..c06009c2a0c 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -178,6 +178,7 @@ fn test_serde_custom_tikv_config() { store_pool_size: 3, future_poll_size: 2, hibernate_regions: false, + early_apply: false, quorum_algorithm: QuorumAlgorithm::IntegrationOnHalfFail, }; value.pd = PdConfig::new(vec!["example.com:443".to_owned()]); diff --git a/tests/integrations/config/test-custom.toml b/tests/integrations/config/test-custom.toml index 558f5a36744..a56ff8ab902 100644 --- a/tests/integrations/config/test-custom.toml +++ b/tests/integrations/config/test-custom.toml @@ -148,6 +148,7 @@ store-max-batch-size = 21 store-pool-size = 3 future-poll-size = 2 hibernate-regions = false +early-apply = false quorum-algorithm = "integration-on-half-fail" [coprocessor] diff --git a/tests/integrations/raftstore/mod.rs b/tests/integrations/raftstore/mod.rs index 7ee222fa50d..be1e2adb4fe 100644 --- a/tests/integrations/raftstore/mod.rs +++ b/tests/integrations/raftstore/mod.rs @@ -7,6 +7,7 @@ mod test_compact_lock_cf; mod test_compact_log; mod test_conf_change; mod test_custom_quorum; +mod test_early_apply; mod test_hibernate; mod test_lease_read; mod test_merge; diff --git a/tests/integrations/raftstore/test_early_apply.rs b/tests/integrations/raftstore/test_early_apply.rs new file mode 100644 index 00000000000..60960317f59 --- /dev/null +++ b/tests/integrations/raftstore/test_early_apply.rs @@ -0,0 +1,186 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +use engine_rocks::RocksSnapshot; +use raft::eraftpb::MessageType; +use raftstore::store::*; +use std::time::*; +use test_raftstore::*; + +/// Allow lost situation. +#[derive(PartialEq, Eq, Clone, Copy)] +enum DataLost { + /// The leader loses commit index. + /// + /// A leader can't lost both the committed entries and commit index + /// at the same time. + LeaderCommit, + /// A follower loses commit index. + FollowerCommit, + /// All the nodes loses data. + /// + /// Typically, both leader and followers lose commit index. + AllLost, +} + +fn test(cluster: &mut Cluster, action: A, check: C, mode: DataLost) +where + A: FnOnce(&mut Cluster), + C: FnOnce(&mut Cluster), +{ + let filter = match mode { + DataLost::AllLost | DataLost::LeaderCommit => RegionPacketFilter::new(1, 1) + .msg_type(MessageType::MsgAppendResponse) + .direction(Direction::Recv), + DataLost::FollowerCommit => RegionPacketFilter::new(1, 3) + .msg_type(MessageType::MsgAppendResponse) + .direction(Direction::Recv), + }; + cluster.add_send_filter(CloneFilterFactory(filter)); + let last_index = cluster.raft_local_state(1, 1).get_last_index(); + action(cluster); + cluster.wait_last_index(1, 1, last_index + 1, Duration::from_secs(3)); + let mut snaps = vec![]; + snaps.push((1, RocksSnapshot::new(cluster.get_raft_engine(1)))); + if mode == DataLost::AllLost { + cluster.wait_last_index(1, 2, last_index + 1, Duration::from_secs(3)); + snaps.push((2, RocksSnapshot::new(cluster.get_raft_engine(2)))); + cluster.wait_last_index(1, 3, last_index + 1, Duration::from_secs(3)); + snaps.push((3, RocksSnapshot::new(cluster.get_raft_engine(3)))); + } + cluster.clear_send_filters(); + check(cluster); + for (id, _) in &snaps { + cluster.stop_node(*id); + } + // Simulate data lost in raft cf. + for (id, snap) in &snaps { + cluster.restore_raft(1, *id, snap); + } + for (id, _) in &snaps { + cluster.run_node(*id).unwrap(); + } + + if mode == DataLost::LeaderCommit || mode == DataLost::AllLost { + cluster.must_transfer_leader(1, new_peer(1, 1)); + } +} + +/// Test whether system can recover from mismatched raft state and apply state. +/// +/// If TiKV is not shutdown gracefully, apply state may go ahead of raft +/// state. TiKV should be able to recognize the situation and start normally. +fn test_early_apply(mode: DataLost) { + let mut cluster = new_node_cluster(0, 3); + cluster.cfg.raft_store.early_apply = true; + cluster.pd_client.disable_default_operator(); + // So compact log will not be triggered automatically. + configure_for_request_snapshot(&mut cluster); + cluster.run(); + if mode == DataLost::LeaderCommit || mode == DataLost::AllLost { + cluster.must_transfer_leader(1, new_peer(1, 1)); + } else { + cluster.must_transfer_leader(1, new_peer(3, 3)); + } + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + + test( + &mut cluster, + |c| { + c.async_put(b"k2", b"v2").unwrap(); + }, + |c| must_get_equal(&c.get_engine(1), b"k2", b"v2"), + mode, + ); + let region = cluster.get_region(b""); + test( + &mut cluster, + |c| { + c.split_region(®ion, b"k2", Callback::None); + }, + |c| c.wait_region_split(®ion), + mode, + ); + if mode != DataLost::LeaderCommit && mode != DataLost::AllLost { + test( + &mut cluster, + |c| { + c.async_remove_peer(1, new_peer(1, 1)).unwrap(); + }, + |c| must_get_none(&c.get_engine(1), b"k2"), + mode, + ); + } +} + +/// Tests whether the cluster can recover from leader lost its commit index. +#[test] +fn test_leader_early_apply() { + test_early_apply(DataLost::LeaderCommit) +} + +/// Tests whether the cluster can recover from follower lost its commit index. +#[test] +fn test_follower_commit_early_apply() { + test_early_apply(DataLost::FollowerCommit) +} + +/// Tests whether the cluster can recover from all nodes lost their commit index. +#[test] +fn test_all_node_crash() { + test_early_apply(DataLost::AllLost) +} + +/// Tests if apply index inside raft is updated correctly. +/// +/// If index is not updated, raft will reject to campaign on timeout. +#[test] +fn test_update_internal_apply_index() { + let mut cluster = new_node_cluster(0, 4); + cluster.cfg.raft_store.early_apply = true; + cluster.pd_client.disable_default_operator(); + // So compact log will not be triggered automatically. + configure_for_request_snapshot(&mut cluster); + cluster.run(); + cluster.must_transfer_leader(1, new_peer(3, 3)); + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + + let filter = RegionPacketFilter::new(1, 3) + .msg_type(MessageType::MsgAppendResponse) + .direction(Direction::Recv); + cluster.add_send_filter(CloneFilterFactory(filter)); + let last_index = cluster.raft_local_state(1, 1).get_last_index(); + cluster.async_remove_peer(1, new_peer(4, 4)).unwrap(); + cluster.async_put(b"k2", b"v2").unwrap(); + let mut snaps = vec![]; + for i in 1..3 { + cluster.wait_last_index(1, i, last_index + 2, Duration::from_secs(3)); + snaps.push((i, RocksSnapshot::new(cluster.get_raft_engine(1)))); + } + cluster.clear_send_filters(); + must_get_equal(&cluster.get_engine(1), b"k2", b"v2"); + must_get_equal(&cluster.get_engine(2), b"k2", b"v2"); + + // Simulate data lost in raft cf. + for (id, snap) in &snaps { + cluster.stop_node(*id); + cluster.restore_raft(1, *id, &snap); + cluster.run_node(*id).unwrap(); + } + + let region = cluster.get_region(b"k1"); + // Issues a heartbeat to followers so they will re-commit the logs. + let resp = read_on_peer( + &mut cluster, + new_peer(3, 3), + region.clone(), + b"k1", + true, + Duration::from_secs(3), + ) + .unwrap(); + assert!(!resp.get_header().has_error(), "{:?}", resp); + cluster.stop_node(3); + cluster.must_put(b"k3", b"v3"); +} diff --git a/tests/integrations/raftstore/test_multi.rs b/tests/integrations/raftstore/test_multi.rs index 29993b79c35..ea066cc0a77 100644 --- a/tests/integrations/raftstore/test_multi.rs +++ b/tests/integrations/raftstore/test_multi.rs @@ -32,6 +32,9 @@ fn test_multi_base_after_bootstrap(cluster: &mut Cluster) { cluster.must_put(key, value); assert_eq!(cluster.must_get(key), Some(value.to_vec())); + let region_id = cluster.get_region_id(b""); + let prev_last_index = cluster.raft_local_state(region_id, 1).get_last_index(); + // sleep 200ms in case the commit packet is dropped by simulated transport. thread::sleep(Duration::from_millis(200)); @@ -50,6 +53,10 @@ fn test_multi_base_after_bootstrap(cluster: &mut Cluster) { cluster.assert_quorum(|engine| engine.get_value(&keys::data_key(key)).unwrap().is_none()); + let last_index = cluster.raft_local_state(region_id, 1).get_last_index(); + let apply_state = cluster.apply_state(region_id, 1); + assert!(apply_state.get_last_commit_index() < last_index); + assert!(apply_state.get_last_commit_index() >= prev_last_index); // TODO add epoch not match test cases. }