Skip to content

Commit

Permalink
cdc: fix panic on finish prepare locks (tikv#17174)
Browse files Browse the repository at this point in the history
close tikv#17172

allow pop untracked lock to prevent cdc component panic

Signed-off-by: 3AceShowHand <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3AceShowHand and ti-chi-bot[bot] committed Jun 27, 2024
1 parent 0331b3a commit ffe9d23
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 22 deletions.
26 changes: 22 additions & 4 deletions components/cdc/src/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub enum DownstreamState {
/// It's just created and rejects change events and resolved timestamps.
Uninitialized,
/// It has got a snapshot for incremental scan, and change events will be
/// accepted. However it still rejects resolved timestamps.
/// accepted. However, it still rejects resolved timestamps.
Initializing,
/// Incremental scan is finished so that resolved timestamps are acceptable
/// now.
Expand Down Expand Up @@ -319,7 +319,7 @@ impl MiniLock {
/// A CDC delegate of a raftstore region peer.
///
/// It converts raft commands into CDC events and broadcast to downstreams.
/// It also track trancation on the fly in order to compute resolved ts.
/// It also tracks transaction on the fly in order to compute resolved ts.
pub struct Delegate {
pub region_id: u64,
pub handle: ObserveHandle,
Expand Down Expand Up @@ -432,8 +432,11 @@ impl Delegate {
assert_eq!(*x.get(), start_ts);
}
},
PendingLock::Untrack { key } => match locks.entry(key) {
BTreeMapEntry::Vacant(..) => unreachable!(),
PendingLock::Untrack { key } => match locks.entry(key.clone()) {
BTreeMapEntry::Vacant(..) => {
warn!("untrack lock not found when try to finish prepare lock tracker";
"key" => %key);
}
BTreeMapEntry::Occupied(x) => {
x.remove();
}
Expand Down Expand Up @@ -1918,4 +1921,19 @@ mod tests {
assert_eq!(v, 0);
assert_eq!(quota.in_use(), 17);
}

#[test]
fn test_lock_tracker_untrack_vacant() {
let quota = Arc::new(MemoryQuota::new(usize::MAX));
let mut delegate = Delegate::new(1, quota.clone(), Default::default());
assert!(delegate.init_lock_tracker());
assert!(!delegate.init_lock_tracker());

delegate.pop_lock(Key::from_raw(b"key1")).unwrap();
let mut scaned_locks = BTreeMap::default();
scaned_locks.insert(Key::from_raw(b"key2"), MiniLock::from_ts(100));
delegate
.finish_prepare_lock_tracker(Default::default(), scaned_locks)
.unwrap();
}
}
1 change: 1 addition & 0 deletions components/cdc/src/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ impl<E: KvEngine> Initializer<E> {
assert!(!has_remain);
let mut locks = BTreeMap::<Key, MiniLock>::new();
for (key, lock) in key_locks {
// When `decode_lock`, only consider `Put` and `Delete`
if matches!(lock.lock_type, LockType::Put | LockType::Delete) {
locks.insert(key, MiniLock::new(lock.ts, lock.txn_source));
}
Expand Down
33 changes: 15 additions & 18 deletions components/cdc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,18 +279,15 @@ impl Service {
peer: &str,
) -> semver::Version {
let version_field = request.get_header().get_ticdc_version();
match semver::Version::parse(version_field) {
Ok(v) => v,
Err(e) => {
warn!(
"empty or invalid TiCDC version, please upgrading TiCDC";
"version" => version_field,
"downstream" => ?peer, "region_id" => request.region_id,
"error" => ?e,
);
semver::Version::new(0, 0, 0)
}
}
semver::Version::parse(version_field).unwrap_or_else(|e| {
warn!(
"empty or invalid TiCDC version, please upgrading TiCDC";
"version" => version_field,
"downstream" => ?peer, "region_id" => request.region_id,
"error" => ?e,
);
semver::Version::new(0, 0, 0)
})
}

fn set_conn_version(
Expand Down Expand Up @@ -337,13 +334,13 @@ impl Service {
request: ChangeDataRequest,
conn_id: ConnId,
) -> Result<(), String> {
let observed_range =
ObservedRange::new(request.start_key.clone(), request.end_key.clone()).unwrap_or_else(|e| {
let observed_range = ObservedRange::new(request.start_key.clone(), request.end_key.clone())
.unwrap_or_else(|e| {
warn!(
"cdc invalid observed start key or end key version";
"downstream" => ?peer, "region_id" => request.region_id,
"error" => ?e,
);
"cdc invalid observed start key or end key version";
"downstream" => ?peer, "region_id" => request.region_id,
"error" => ?e,
);
ObservedRange::default()
});
let downstream = Downstream::new(
Expand Down

0 comments on commit ffe9d23

Please sign in to comment.