Skip to content

Commit

Permalink
txn-file,txn: Check primary region (tikv#1799)
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Aug 16, 2024
1 parent 418720b commit 9a99889
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 20 deletions.
4 changes: 4 additions & 0 deletions components/kvengine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,10 @@ impl SnapAccessCore {
self.data.outer_end.chunk()
}

pub fn key_is_in_range(&self, key: &[u8]) -> bool {
self.get_start_key() <= key && key < self.get_end_key()
}

pub fn get_inner_start(&self) -> InnerKey<'_> {
self.data.inner_start()
}
Expand Down
64 changes: 63 additions & 1 deletion src/storage/txn/commands/txn_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::{cmp::Ordering, iter::Iterator as StdIterator, mem, ops::Deref};

use api_version::ApiV2;
use kvengine::{
table::txn_file::{TxnCtx, TxnFile, TxnFileId, TxnFileIterator, OP_CHECK_NOT_EXIST, OP_INSERT},
table::{
txn_file::{TxnCtx, TxnFile, TxnFileId, TxnFileIterator, OP_CHECK_NOT_EXIST, OP_INSERT},
InnerKey,
},
txn_chunk_manager::TxnChunkManager,
Iterator, SnapAccess, UserMeta, LOCK_CF, WRITE_CF,
};
Expand Down Expand Up @@ -448,6 +451,57 @@ impl TxnFileCommand {
Ok(None)
}

/// Check the transaction status when a "not primary txn file lock" commit
/// to primary region (i.e. the region contains primary key).
///
/// This condition will happen when the primary txn file lock is committed
/// or rollback, and then the primary region is merged.
///
/// See https://github.com/tidbcloud/cloud-storage-engine/issues/1800.
fn check_commit_primary_region(
&self,
lock: &txn_types::Lock,
lock_txn_file: &TxnFile,
start_ts: TimeStamp,
commit_ts: TimeStamp,
snap_access: &SnapAccess,
) -> crate::storage::mvcc::Result<()> {
let is_primary_txn_file = || {
let primary_inner =
InnerKey::from_outer_key(&lock.primary, snap_access.get_inner_key_offset());
lock_txn_file.lower_bound() <= primary_inner
&& primary_inner < lock_txn_file.upper_bound()
};

if snap_access.key_is_in_range(&lock.primary) && !is_primary_txn_file() {
let txn_lock_not_found = || -> mvcc::Error {
ErrorInner::TxnLockNotFound {
start_ts,
commit_ts,
key: vec![],
}
.into()
};

let mut cloud_reader = CloudReader::new(snap_access.clone(), true);
let record = cloud_reader
.get_txn_commit_record(&Key::from_raw(&lock.primary), start_ts)?
.info();
return match record {
Some((_, WriteType::Rollback)) | None => Err(txn_lock_not_found()),
Some((committed_ts, WriteType::Put | WriteType::Delete | WriteType::Lock)) => {
debug_assert_eq!(
committed_ts, commit_ts,
"commit_ts mismatch: start_ts {}, commit_ts {}, committed_ts {}",
start_ts, commit_ts, committed_ts
);
Ok(())
}
};
}
Ok(())
}

fn process_commit(
&mut self,
snap_access: &SnapAccess,
Expand All @@ -462,6 +516,14 @@ impl TxnFileCommand {
}
if let Some(lock_txn_file) = snap_access.get_lock_txn_file(self.txn_file_ref.start_ts) {
let lock = txn_types::Lock::parse(lock_txn_file.get_lock_val_prefix())?;
self.check_commit_primary_region(
&lock,
&lock_txn_file,
start_ts,
commit_ts,
snap_access,
)?;

debug_assert_eq!(lock.ts, self.ts());
if commit_ts < lock.min_commit_ts {
info!(
Expand Down
7 changes: 3 additions & 4 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,14 +777,13 @@ impl<E: Engine, L: LockManager> Scheduler<E, L> {
where
StorageError: From<ER>,
{
debug!("write command finished with error"; "cid" => cid);
let err = StorageError::from(err);
debug!("write command finished with error"; "cid" => cid, "err" => ?err);
let tctx = self.inner.dequeue_task_context(cid);

SCHED_STAGE_COUNTER_VEC.get(tctx.tag).error.inc();

let pr = ProcessResult::Failed {
err: StorageError::from(err),
};
let pr = ProcessResult::Failed { err };
if let Some(cb) = tctx.cb {
cb.execute(pr);
}
Expand Down
126 changes: 111 additions & 15 deletions tests/cloud_engine/transaction/txn_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use test_cloud_server::{
must_wait,
oss::prepare_dfs,
try_wait,
txn::txn_file::TxnFileHelper,
util::Mutation,
ServerCluster,
};
Expand Down Expand Up @@ -734,23 +735,15 @@ fn test_txn_file_abnormal_impl(data_count: usize, use_txn_file: bool, enable_inn
}

let txn_file_helper = client.txn_file_helper();
let make_mutations = |value: &str| -> (Vec<Mutation>, TxnMutations) {
let gen_val = i_to_val_opt(value, 3);
let mut mutations = vec![];
for i in 0..data_count {
let mut m = Mutation::default();
m.set_op(kvrpcpb::Op::Put);
m.set_key(gen_key(i));
m.set_value(gen_val(i));
mutations.push(m);
}
let txn_muts = block_on(TxnMutations::build(
mutations.clone(),
let make_mutations = |value| {
make_txn_mutations(
value,
0,
data_count,
&gen_key,
write_method,
txn_file_helper.clone(),
))
.unwrap();
(mutations, txn_muts)
)
};

// Commit without prewrite:
Expand Down Expand Up @@ -1138,6 +1131,109 @@ fn test_txn_file_merge_impl(ranges: Vec<Range<usize>>, enable_inner_key_off: boo
cluster.stop();
}

// Test for commit "not primary" lock to primary region.
// See https://github.com/tidbcloud/cloud-storage-engine/issues/1800.
#[test]
fn test_commit_primary_region() {
test_util::init_log_for_test();
let (_temp_dir, mut oss, dfs_config) = prepare_dfs("test");

let node_ids = alloc_node_id_vec(NODES_COUNT);
let pd_wrapper = PdWrapper::new_test(1, &SecurityConfig::default(), None);
let mut cluster = ServerCluster::new_opt(
node_ids,
|_, conf| {
conf.dfs = dfs_config.clone();
conf.enable_inner_key_offset = true;
},
pd_wrapper,
);
cluster.start_tikv_workers(1, 2, false);
cluster.wait_region_replicated(&[], 3);

let gen_key = generate_keyspace_key(KEYSPACE_ID);

let rt = tokio::runtime::Runtime::new().unwrap();
let _enter = rt.enter();

let mut client = cluster.new_client_opt(ClusterClientOptions {
txn_file_max_chunk_size: Some(1024),
..Default::default()
});
let txn_file_helper = client.txn_file_helper();
let make_mutations = |value, start, end| {
make_txn_mutations(
value,
start,
end,
&gen_key,
TxnWriteMethod::FileBased,
txn_file_helper.clone(),
)
};
client.split_keyspace(KEYSPACE_ID);

// Prewrite to two regions.
client.try_split(&gen_key(100), 5).unwrap();
let (_, txn_muts) = make_mutations("value_", 0, 200);
let start_ts = client.get_ts();
client
.kv_prewrite(txn_muts.primary(), None, txn_muts.clone(), start_ts)
.unwrap();

// Rollback the primary region.
let (_, txn_muts0) = make_mutations("value0_", 0, 1);
client.kv_rollback(txn_muts0, start_ts).unwrap();
let ok = client.try_merge_and_wait(&gen_key(0), &gen_key(100), 5);
assert!(ok);

// Commit must fail.
let commit_ts = client.get_ts();
let err = client.kv_commit(txn_muts, start_ts, commit_ts).unwrap_err();
expect_err_msg(&err, "TxnLockNotFound");

{
// To work around that data not existed in ref store will not be checked.
let guard = client.ref_store();
let mut ref_store = guard.lock().unwrap();
for k in 0..200 {
ref_store.del_kv(gen_key(k));
}
}
client.verify_data_with_ref_store();
cluster.stop();
oss.shutdown();
}

fn make_txn_mutations<F>(
value: &str,
start: usize,
end: usize,
gen_key: F,
write_method: TxnWriteMethod,
txn_file_helper: Option<Arc<TxnFileHelper>>,
) -> (Vec<Mutation>, TxnMutations)
where
F: Fn(usize) -> Vec<u8>,
{
let gen_val = i_to_val_opt(value, 3);
let mut mutations = vec![];
for i in start..end {
let mut m = Mutation::default();
m.set_op(kvrpcpb::Op::Put);
m.set_key(gen_key(i));
m.set_value(gen_val(i));
mutations.push(m);
}
let txn_muts = block_on(TxnMutations::build(
mutations.clone(),
write_method,
txn_file_helper,
))
.unwrap();
(mutations, txn_muts)
}

fn build_txn_files(
dfs: &Arc<dyn Dfs>,
start_ts: u64,
Expand Down

0 comments on commit 9a99889

Please sign in to comment.