Skip to content

Commit

Permalink
raftstore: block in-memory pessimistic locks during the flashback (ti…
Browse files Browse the repository at this point in the history
…kv#14859)

ref tikv#13303, close pingcap/tidb#44292

During the Flashback process, we should prevent any read or write operations on the in-memory pessimistic lock table
and clear it like rolling back other locks to ensure that Flashback can proceed smoothly.

Signed-off-by: JmPotato <[email protected]>
Signed-off-by: tonyxuqqi <[email protected]>
  • Loading branch information
JmPotato authored and tonyxuqqi committed Jun 22, 2023
1 parent dbd0bac commit fd273d8
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 7 deletions.
23 changes: 21 additions & 2 deletions components/raftstore-v2/src/operation/command/admin/flashback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use kvproto::{
use protobuf::Message;
use raftstore::{
coprocessor::RegionChangeReason,
store::metrics::{PEER_ADMIN_CMD_COUNTER, PEER_IN_FLASHBACK_STATE},
store::{
metrics::{PEER_ADMIN_CMD_COUNTER, PEER_IN_FLASHBACK_STATE},
LocksStatus,
},
Result,
};

Expand Down Expand Up @@ -85,7 +88,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
pub fn on_apply_res_flashback<T>(
&mut self,
store_ctx: &mut StoreContext<EK, ER, T>,
mut res: FlashbackResult,
#[allow(unused_mut)] mut res: FlashbackResult,
) {
(|| {
fail_point!("keep_peer_fsm_flashback_state_false", |_| {
Expand Down Expand Up @@ -114,6 +117,22 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
.unwrap();
self.set_has_extra_write();

let mut pessimistic_locks = self.txn_context().ext().pessimistic_locks.write();
pessimistic_locks.status = if res.region_state.get_region().is_in_flashback {
// To prevent the insertion of any new pessimistic locks, set the lock status
// to `LocksStatus::IsInFlashback` and clear all the existing locks.
pessimistic_locks.clear();
LocksStatus::IsInFlashback
} else if self.is_leader() {
// If the region is not in flashback, the leader can continue to insert
// pessimistic locks.
LocksStatus::Normal
} else {
// If the region is not in flashback and the peer is not the leader, it
// cannot insert pessimistic locks.
LocksStatus::NotLeader
}

// Compares to v1, v2 does not expire remote lease, because only
// local reader can serve read requests.
}
Expand Down
15 changes: 15 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6505,6 +6505,21 @@ where
})());
// Let the leader lease to None to ensure that local reads are not executed.
self.fsm.peer.leader_lease_mut().expire_remote_lease();
let mut pessimistic_locks = self.fsm.peer.txn_ext.pessimistic_locks.write();
pessimistic_locks.status = if self.region().is_in_flashback {
// To prevent the insertion of any new pessimistic locks, set the lock status
// to `LocksStatus::IsInFlashback` and clear all the existing locks.
pessimistic_locks.clear();
LocksStatus::IsInFlashback
} else if self.fsm.peer.is_leader() {
// If the region is not in flashback, the leader can continue to insert
// pessimistic locks.
LocksStatus::Normal
} else {
// If the region is not in flashback and the peer is not the leader, it
// cannot insert pessimistic locks.
LocksStatus::NotLeader
}
}

fn on_ready_batch_switch_witness(&mut self, sw: SwitchWitness) {
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/txn_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub enum LocksStatus {
TransferringLeader,
MergingRegion,
NotLeader,
IsInFlashback,
}

impl fmt::Debug for PeerPessimisticLocks {
Expand Down
10 changes: 9 additions & 1 deletion components/test_raftstore-v2/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,14 @@ impl<EK: KvEngine> Simulator<EK> for ServerCluster<EK> {

impl<EK: KvEngine> Cluster<ServerCluster<EK>, EK> {
pub fn must_get_snapshot_of_region(&mut self, region_id: u64) -> RegionSnapshot<EK::Snapshot> {
self.must_get_snapshot_of_region_with_ctx(region_id, SnapContext::default())
}

pub fn must_get_snapshot_of_region_with_ctx(
&mut self,
region_id: u64,
snap_ctx: SnapContext<'_>,
) -> RegionSnapshot<EK::Snapshot> {
let mut try_snapshot = || -> Option<RegionSnapshot<EK::Snapshot>> {
let leader = self.leader_of_region(region_id)?;
let store_id = leader.store_id;
Expand All @@ -897,7 +905,7 @@ impl<EK: KvEngine> Cluster<ServerCluster<EK>, EK> {
let mut storage = self.sim.rl().storages.get(&store_id).unwrap().clone();
let snap_ctx = SnapContext {
pb_ctx: &ctx,
..Default::default()
..snap_ctx.clone()
};
storage.snapshot(snap_ctx).ok()
};
Expand Down
10 changes: 9 additions & 1 deletion components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,14 @@ impl Simulator for ServerCluster {

impl Cluster<ServerCluster> {
pub fn must_get_snapshot_of_region(&mut self, region_id: u64) -> RegionSnapshot<RocksSnapshot> {
self.must_get_snapshot_of_region_with_ctx(region_id, Default::default())
}

pub fn must_get_snapshot_of_region_with_ctx(
&mut self,
region_id: u64,
snap_ctx: SnapContext<'_>,
) -> RegionSnapshot<RocksSnapshot> {
let mut try_snapshot = || -> Option<RegionSnapshot<RocksSnapshot>> {
let leader = self.leader_of_region(region_id)?;
let store_id = leader.store_id;
Expand All @@ -800,7 +808,7 @@ impl Cluster<ServerCluster> {
let mut storage = self.sim.rl().storages.get(&store_id).unwrap().clone();
let snap_ctx = SnapContext {
pb_ctx: &ctx,
..Default::default()
..snap_ctx.clone()
};
storage.snapshot(snap_ctx).ok()
};
Expand Down
18 changes: 17 additions & 1 deletion src/storage/mvcc/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::ops::Bound;

use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE};
use kvproto::{
errorpb::{self, EpochNotMatch, StaleCommand},
errorpb::{self, EpochNotMatch, FlashbackInProgress, StaleCommand},
kvrpcpb::Context,
};
use raftstore::store::LocksStatus;
use tikv_kv::{SnapshotExt, SEEK_BOUND};
use txn_types::{Key, Lock, OldValue, TimeStamp, Value, Write, WriteRef, WriteType};

Expand Down Expand Up @@ -146,6 +147,8 @@ pub struct MvccReader<S: EngineSnapshot> {
term: u64,
#[allow(dead_code)]
version: u64,

allow_in_flashback: bool,
}

impl<S: EngineSnapshot> MvccReader<S> {
Expand All @@ -164,6 +167,7 @@ impl<S: EngineSnapshot> MvccReader<S> {
fill_cache,
term: 0,
version: 0,
allow_in_flashback: false,
}
}

Expand All @@ -182,6 +186,7 @@ impl<S: EngineSnapshot> MvccReader<S> {
fill_cache: !ctx.get_not_fill_cache(),
term: ctx.get_term(),
version: ctx.get_region_epoch().get_version(),
allow_in_flashback: false,
}
}

Expand Down Expand Up @@ -267,6 +272,13 @@ impl<S: EngineSnapshot> MvccReader<S> {
err.set_epoch_not_match(EpochNotMatch::default());
return Some(Err(KvError::from(err).into()));
}
// If the region is in the flashback state, it should not be allowed to read the
// locks.
if locks.status == LocksStatus::IsInFlashback && !self.allow_in_flashback {
let mut err = errorpb::Error::default();
err.set_flashback_in_progress(FlashbackInProgress::default());
return Some(Err(KvError::from(err).into()));
}

locks.get(key).map(|(lock, _)| {
// For write commands that are executed in serial, it should be impossible
Expand Down Expand Up @@ -768,6 +780,10 @@ impl<S: EngineSnapshot> MvccReader<S> {
pub fn snapshot(&self) -> &S {
&self.snapshot
}

pub fn set_allow_in_flashback(&mut self, set_allow_in_flashback: bool) {
self.allow_in_flashback = set_allow_in_flashback;
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/storage/txn/commands/flashback_to_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for FlashbackToVersion {
fn process_write(mut self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
let mut reader =
MvccReader::new_with_ctx(snapshot.clone(), Some(ScanMode::Forward), &self.ctx);
reader.set_allow_in_flashback(true);
let mut txn = MvccTxn::new(TimeStamp::zero(), context.concurrency_manager);
match self.state {
FlashbackToVersionState::RollbackLock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ impl<S: Snapshot> ReadCommand<S> for FlashbackToVersionReadPhase {
fn process_read(self, snapshot: S, statistics: &mut Statistics) -> Result<ProcessResult> {
let tag = self.tag().get_str();
let mut reader = MvccReader::new_with_ctx(snapshot, Some(ScanMode::Forward), &self.ctx);
reader.set_allow_in_flashback(true);
// Filter out the SST that does not have a newer version than `self.version` in
// `CF_WRITE`, i.e, whose latest `commit_ts` <= `self.version` in the later
// scan. By doing this, we can only flashback those keys that have version
Expand Down
66 changes: 64 additions & 2 deletions tests/integrations/raftstore/test_flashback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,76 @@ use kvproto::{
raft_cmdpb::{AdminCmdType, CmdType, RaftCmdRequest, RaftCmdResponse, Request},
raft_serverpb::RegionLocalState,
};
use raftstore::store::Callback;
use raftstore::store::{Callback, LocksStatus};
use test_raftstore::*;
use test_raftstore_macro::test_case;
use txn_types::WriteBatchFlags;
use tikv::storage::kv::SnapContext;
use txn_types::{Key, PessimisticLock, WriteBatchFlags};

const TEST_KEY: &[u8] = b"k1";
const TEST_VALUE: &[u8] = b"v1";

#[test_case(test_raftstore::new_server_cluster)]
#[test_case(test_raftstore_v2::new_server_cluster)]
fn test_flashback_with_in_memory_pessimistic_locks() {
let mut cluster = new_cluster(0, 3);
cluster.cfg.raft_store.raft_heartbeat_ticks = 20;
cluster.run();
cluster.must_transfer_leader(1, new_peer(1, 1));

let region = cluster.get_region(TEST_KEY);
// Write a pessimistic lock to the in-memory pessimistic lock table.
{
let snapshot = cluster.must_get_snapshot_of_region(region.get_id());
let txn_ext = snapshot.txn_ext.unwrap();
let mut pessimistic_locks = txn_ext.pessimistic_locks.write();
assert!(pessimistic_locks.is_writable());
pessimistic_locks
.insert(vec![(
Key::from_raw(TEST_KEY),
PessimisticLock {
primary: TEST_KEY.to_vec().into_boxed_slice(),
start_ts: 10.into(),
ttl: 3000,
for_update_ts: 20.into(),
min_commit_ts: 30.into(),
last_change_ts: 5.into(),
versions_to_last_change: 3,
is_locked_with_conflict: false,
},
)])
.unwrap();
assert_eq!(pessimistic_locks.len(), 1);
}
// Prepare flashback.
cluster.must_send_wait_flashback_msg(region.get_id(), AdminCmdType::PrepareFlashback);
// Check the in-memory pessimistic lock table.
{
let snapshot = cluster.must_get_snapshot_of_region_with_ctx(
region.get_id(),
SnapContext {
allowed_in_flashback: true,
..Default::default()
},
);
let txn_ext = snapshot.txn_ext.unwrap();
let pessimistic_locks = txn_ext.pessimistic_locks.read();
assert!(!pessimistic_locks.is_writable());
assert_eq!(pessimistic_locks.status, LocksStatus::IsInFlashback);
assert_eq!(pessimistic_locks.len(), 0);
}
// Finish flashback.
cluster.must_send_wait_flashback_msg(region.get_id(), AdminCmdType::FinishFlashback);
// Check the in-memory pessimistic lock table.
{
let snapshot = cluster.must_get_snapshot_of_region(region.get_id());
let txn_ext = snapshot.txn_ext.unwrap();
let pessimistic_locks = txn_ext.pessimistic_locks.read();
assert!(pessimistic_locks.is_writable());
assert_eq!(pessimistic_locks.len(), 0);
}
}

#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
fn test_allow_read_only_request() {
Expand Down

0 comments on commit fd273d8

Please sign in to comment.