From 9f2dcd494c1857522346e29e0d896c7a90b612cd Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 22 Jan 2021 19:01:42 +0800 Subject: [PATCH] cherry pick #9515 to release-4.0 Signed-off-by: ti-srebot --- components/cdc/src/delegate.rs | 19 +- components/cdc/src/endpoint.rs | 20 +- components/cdc/src/service.rs | 13 + components/cdc/tests/integrations/test_cdc.rs | 700 ++++++++++++++++++ 4 files changed, 740 insertions(+), 12 deletions(-) diff --git a/components/cdc/src/delegate.rs b/components/cdc/src/delegate.rs index e9bcc32294d..da4bc9b440c 100644 --- a/components/cdc/src/delegate.rs +++ b/components/cdc/src/delegate.rs @@ -80,6 +80,7 @@ pub struct Downstream { region_epoch: RegionEpoch, sink: Option>, state: Arc>, + enable_old_value: bool, } impl Downstream { @@ -92,6 +93,7 @@ impl Downstream { region_epoch: RegionEpoch, req_id: u64, conn_id: ConnID, + enable_old_value: bool, ) -> Downstream { Downstream { id: DownstreamID::new(), @@ -101,6 +103,7 @@ impl Downstream { region_epoch, sink: None, state: Arc::new(AtomicCell::new(DownstreamState::default())), + enable_old_value, } } @@ -366,7 +369,19 @@ impl Delegate { if normal_only && downstreams[i].state.load() != DownstreamState::Normal { continue; } +<<<<<<< HEAD downstreams[i].sink_event(change_data_event.clone()); +======= + let mut event = change_data_event.clone(); + if !downstream.enable_old_value && self.txn_extra_op == TxnExtraOp::ReadOldValue { + if let Some(Event_oneof_event::Entries(ref mut entries)) = event.event { + for entry in entries.mut_entries().iter_mut() { + entry.mut_old_value().clear(); + } + } + } + downstream.sink_event(event); +>>>>>>> 927e36f95... cdc: fix old value config glitch when changefeeds with different settings connect to one region (#9515) } downstreams.last().unwrap().sink_event(change_data_event); } @@ -802,7 +817,7 @@ mod tests { let rx = BatchReceiver::new(rx, 1, Vec::new, VecCollector); let request_id = 123; let mut downstream = - Downstream::new(String::new(), region_epoch, request_id, ConnID::new()); + Downstream::new(String::new(), region_epoch, request_id, ConnID::new(), true); downstream.set_sink(sink); let mut delegate = Delegate::new(region_id); delegate.subscribe(downstream); @@ -936,7 +951,7 @@ mod tests { let rx = BatchReceiver::new(rx, 1, Vec::new, VecCollector); let request_id = 123; let mut downstream = - Downstream::new(String::new(), region_epoch, request_id, ConnID::new()); + Downstream::new(String::new(), region_epoch, request_id, ConnID::new(), true); let downstream_id = downstream.get_id(); downstream.set_sink(sink); let mut delegate = Delegate::new(region_id); diff --git a/components/cdc/src/endpoint.rs b/components/cdc/src/endpoint.rs index fe75040b525..64baadc64e6 100644 --- a/components/cdc/src/endpoint.rs +++ b/components/cdc/src/endpoint.rs @@ -1178,7 +1178,7 @@ mod tests { let mut req = ChangeDataRequest::default(); req.set_region_id(1); let region_epoch = req.get_region_epoch().clone(); - let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id, true); ep.run(Task::Register { request: req, downstream, @@ -1226,7 +1226,7 @@ mod tests { let mut req = ChangeDataRequest::default(); req.set_region_id(1); let region_epoch = req.get_region_epoch().clone(); - let downstream = Downstream::new("".to_string(), region_epoch.clone(), 1, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch.clone(), 1, conn_id, true); ep.run(Task::Register { request: req.clone(), downstream, @@ -1236,7 +1236,7 @@ mod tests { assert_eq!(ep.capture_regions.len(), 1); // duplicate request error. - let downstream = Downstream::new("".to_string(), region_epoch.clone(), 2, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch.clone(), 2, conn_id, true); ep.run(Task::Register { request: req.clone(), downstream, @@ -1260,7 +1260,7 @@ mod tests { assert_eq!(ep.capture_regions.len(), 1); // Compatibility error. - let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id, true); ep.run(Task::Register { request: req, downstream, @@ -1314,7 +1314,7 @@ mod tests { let mut req = ChangeDataRequest::default(); req.set_region_id(1); let region_epoch = req.get_region_epoch().clone(); - let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true); ep.run(Task::Register { request: req.clone(), downstream, @@ -1339,7 +1339,7 @@ mod tests { // Register region 2 to the conn. req.set_region_id(2); - let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true); ep.run(Task::Register { request: req.clone(), downstream, @@ -1372,7 +1372,7 @@ mod tests { let conn_id = conn.get_id(); ep.run(Task::OpenConn { conn }); req.set_region_id(3); - let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id, true); ep.run(Task::Register { request: req, downstream, @@ -1439,7 +1439,7 @@ mod tests { let mut req = ChangeDataRequest::default(); req.set_region_id(1); let region_epoch = req.get_region_epoch().clone(); - let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true); let downstream_id = downstream.get_id(); ep.run(Task::Register { request: req.clone(), @@ -1473,7 +1473,7 @@ mod tests { } assert_eq!(ep.capture_regions.len(), 0); - let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true); let new_downstream_id = downstream.get_id(); ep.run(Task::Register { request: req.clone(), @@ -1516,7 +1516,7 @@ mod tests { assert_eq!(ep.capture_regions.len(), 0); // Stale deregister should be filtered. - let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id); + let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id, true); ep.run(Task::Register { request: req, downstream, diff --git a/components/cdc/src/service.rs b/components/cdc/src/service.rs index 6e36f27800d..bae65a896d3 100644 --- a/components/cdc/src/service.rs +++ b/components/cdc/src/service.rs @@ -12,6 +12,7 @@ use grpcio::{ use kvproto::cdcpb::{ ChangeData, ChangeDataEvent, ChangeDataRequest, Compatibility, Event, ResolvedTs, }; +use kvproto::kvrpcpb::ExtraOp as TxnExtraOp; use protobuf::Message; use security::{check_common_name, SecurityManager}; use tikv_util::collections::HashMap; @@ -307,6 +308,7 @@ impl ChangeData for Service { let recv_req = stream.for_each(move |request| { let region_epoch = request.get_region_epoch().clone(); let req_id = request.get_request_id(); + let enable_old_value = request.get_extra_op() == TxnExtraOp::ReadOldValue; let version = match semver::Version::parse(request.get_header().get_ticdc_version()) { Ok(v) => v, Err(e) => { @@ -316,8 +318,19 @@ impl ChangeData for Service { semver::Version::new(0, 0, 0) } }; +<<<<<<< HEAD let downstream = Downstream::new(peer.clone(), region_epoch, req_id, conn_id); scheduler +======= + let downstream = Downstream::new( + peer.clone(), + region_epoch, + req_id, + conn_id, + enable_old_value, + ); + let ret = scheduler +>>>>>>> 927e36f95... cdc: fix old value config glitch when changefeeds with different settings connect to one region (#9515) .schedule(Task::Register { request, downstream, diff --git a/components/cdc/tests/integrations/test_cdc.rs b/components/cdc/tests/integrations/test_cdc.rs index dc9fc4000ac..2fa926965c1 100644 --- a/components/cdc/tests/integrations/test_cdc.rs +++ b/components/cdc/tests/integrations/test_cdc.rs @@ -874,6 +874,706 @@ fn test_old_value_basic() { } } +<<<<<<< HEAD event_feed_wrap.as_ref().replace(None); +======= + event_feed_wrap.replace(None); + suite.stop(); +} + +#[test] +fn test_old_value_multi_changefeeds() { + let mut suite = TestSuite::new(1); + let mut req = suite.new_changedata_request(1); + req.set_extra_op(ExtraOp::ReadOldValue); + let (mut req_tx_1, event_feed_wrap_1, receive_event_1) = + new_event_feed(suite.get_region_cdc_client(1)); + let _req_tx_1 = block_on(req_tx_1.send((req.clone(), WriteFlags::default()))).unwrap(); + + req.set_extra_op(ExtraOp::Noop); + let (mut req_tx_2, event_feed_wrap_2, receive_event_2) = + new_event_feed(suite.get_region_cdc_client(1)); + let _req_tx_2 = block_on(req_tx_2.send((req, WriteFlags::default()))).unwrap(); + + sleep_ms(1000); + // Insert value + let mut m1 = Mutation::default(); + let k1 = b"k1".to_vec(); + m1.set_op(Op::Put); + m1.key = k1.clone(); + m1.value = b"v1".to_vec(); + let ts1 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_kv_prewrite(1, vec![m1], k1.clone(), ts1); + let ts2 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_kv_commit(1, vec![k1.clone()], ts1, ts2); + // Update value + let mut m2 = Mutation::default(); + m2.set_op(Op::Put); + m2.key = k1.clone(); + m2.value = vec![b'3'; 5120]; + let ts3 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_kv_prewrite(1, vec![m2], k1.clone(), ts3); + let ts4 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_kv_commit(1, vec![k1], ts3, ts4); + let mut event_count = 0; + loop { + let events = receive_event_1(false).events.to_vec(); + for event in events.into_iter() { + match event.event.unwrap() { + Event_oneof_event::Entries(mut es) => { + for row in es.take_entries().to_vec() { + if row.get_type() == EventLogType::Prewrite { + if row.get_start_ts() == ts3.into_inner() { + assert_eq!(row.get_old_value(), b"v1"); + event_count += 1; + } else { + assert_eq!(row.get_old_value(), b""); + event_count += 1; + } + } + } + } + other => panic!("unknown event {:?}", other), + } + } + if event_count >= 2 { + break; + } + } + + event_count = 0; + loop { + let events = receive_event_2(false).events.to_vec(); + for event in events.into_iter() { + match event.event.unwrap() { + Event_oneof_event::Entries(mut es) => { + for row in es.take_entries().to_vec() { + if row.get_type() == EventLogType::Prewrite { + assert_eq!(row.get_old_value(), b""); + event_count += 1; + } + } + } + other => panic!("unknown event {:?}", other), + } + } + if event_count >= 2 { + break; + } + } + + event_feed_wrap_1.replace(None); + event_feed_wrap_2.replace(None); + suite.stop(); +} + +#[test] +fn test_cdc_resolve_ts_checking_concurrency_manager() { + let mut suite: crate::TestSuite = TestSuite::new(1); + let cm: ConcurrencyManager = suite.get_txn_concurrency_manager(1).unwrap(); + let lock_key = |key: &[u8], ts: u64| { + let guard = block_on(cm.lock_key(&Key::from_raw(key))); + guard.with_lock(|l| { + *l = Some(Lock::new( + LockType::Put, + key.to_vec(), + ts.into(), + 0, + None, + 0.into(), + 1, + ts.into(), + )) + }); + guard + }; + + cm.update_max_ts(20.into()); + + let guard = lock_key(b"a", 80); + suite.set_tso(100); + + let mut req = suite.new_changedata_request(1); + req.set_checkpoint_ts(100); + let (mut req_tx, event_feed_wrap, receive_event) = + new_event_feed(suite.get_region_cdc_client(1)); + let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + // Make sure region 1 is registered. + let mut events = receive_event(false).events; + assert_eq!(events.len(), 1); + match events.pop().unwrap().event.unwrap() { + // Even if there is no write, + // it should always outputs an Initialized event. + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Initialized, "{:?}", es); + } + other => panic!("unknown event {:?}", other), + } + + fn check_resolved_ts(event: ChangeDataEvent, check_fn: impl Fn(u64)) { + if let Some(resolved_ts) = event.resolved_ts.as_ref() { + check_fn(resolved_ts.ts) + } + }; + + check_resolved_ts(receive_event(true), |ts| assert_eq!(ts, 80)); + assert!(cm.max_ts() >= 100.into()); + + drop(guard); + for retry in 0.. { + let event = receive_event(true); + let mut current_rts = 0; + if let Some(resolved_ts) = event.resolved_ts.as_ref() { + current_rts = resolved_ts.ts; + if resolved_ts.ts >= 100 { + break; + } + } + if retry >= 5 { + panic!( + "resolved ts didn't push properly after unlocking memlock. current resolved_ts: {}", + current_rts + ); + } + } + + let _guard = lock_key(b"a", 90); + // The resolved_ts should be blocked by the mem lock but it's already greater than 90. + // Retry until receiving an unchanged resovled_ts because the first several resolved ts received + // might be updated before acquiring the lock. + let mut last_resolved_ts = 0; + let mut success = false; + for _ in 0..5 { + let event = receive_event(true); + if let Some(resolved_ts) = event.resolved_ts.as_ref() { + let ts = resolved_ts.ts; + assert!(ts > 100); + if ts == last_resolved_ts { + success = true; + break; + } + assert!(ts > last_resolved_ts); + last_resolved_ts = ts; + } + } + assert!(success, "resolved_ts not blocked by the memory lock"); + + event_feed_wrap.replace(None); + suite.stop(); +} + +#[test] +fn test_cdc_1pc() { + let mut suite = TestSuite::new(1); + + let req = suite.new_changedata_request(1); + let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + let event = receive_event(false); + event.events.into_iter().for_each(|e| { + match e.event.unwrap() { + // Even if there is no write, + // it should always outputs an Initialized event. + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Initialized, "{:?}", es); + } + other => panic!("unknown event {:?}", other), + } + }); + + let (k1, v1) = (b"k1", b"v1"); + let (k2, v2) = (b"k2", &[0u8; 512]); + + let start_ts = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + + // Let resolved_ts update. + sleep_ms(500); + + // Prewrite + let mut prewrite_req = PrewriteRequest::default(); + let region_id = 1; + prewrite_req.set_context(suite.get_context(region_id)); + let mut m1 = Mutation::default(); + m1.set_op(Op::Put); + m1.key = k1.to_vec(); + m1.value = v1.to_vec(); + prewrite_req.mut_mutations().push(m1); + let mut m2 = Mutation::default(); + m2.set_op(Op::Put); + m2.key = k2.to_vec(); + m2.value = v2.to_vec(); + prewrite_req.mut_mutations().push(m2); + prewrite_req.primary_lock = k1.to_vec(); + prewrite_req.start_version = start_ts.into_inner(); + prewrite_req.lock_ttl = prewrite_req.start_version + 1; + prewrite_req.set_try_one_pc(true); + let prewrite_resp = suite + .get_tikv_client(region_id) + .kv_prewrite(&prewrite_req) + .unwrap(); + assert!(prewrite_resp.get_one_pc_commit_ts() > 0); + + let mut resolved_ts = 0; + loop { + let mut cde = receive_event(true); + if cde.get_resolved_ts().get_ts() > resolved_ts { + resolved_ts = cde.get_resolved_ts().get_ts(); + } + let events = cde.mut_events(); + if !events.is_empty() { + assert_eq!(events.len(), 1); + match events.pop().unwrap().event.unwrap() { + Event_oneof_event::Entries(entries) => { + assert_eq!(entries.entries.len(), 2); + let (e0, e1) = (&entries.entries[0], &entries.entries[1]); + assert_eq!(e0.get_type(), EventLogType::Committed); + assert_eq!(e0.get_key(), k1); + assert_eq!(e0.get_value(), v1); + assert!(e0.commit_ts > resolved_ts); + assert_eq!(e1.get_type(), EventLogType::Committed); + assert_eq!(e1.get_key(), k2); + assert_eq!(e1.get_value(), v2); + assert!(e1.commit_ts > resolved_ts); + break; + } + other => panic!("unknown event {:?}", other), + } + } + } + + suite.stop(); +} + +#[test] +fn test_old_value_1pc() { + let mut suite = TestSuite::new(1); + let mut req = suite.new_changedata_request(1); + req.set_extra_op(ExtraOp::ReadOldValue); + let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); + let _req_tx = block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + + // Insert value + let mut m1 = Mutation::default(); + let k1 = b"k1".to_vec(); + m1.set_op(Op::Put); + m1.key = k1.clone(); + m1.value = b"v1".to_vec(); + suite.must_kv_prewrite(1, vec![m1], k1.clone(), 10.into()); + suite.must_kv_commit(1, vec![k1.clone()], 10.into(), 15.into()); + + // Prewrite with 1PC + let start_ts = 20; + let mut prewrite_req = PrewriteRequest::default(); + let region_id = 1; + prewrite_req.set_context(suite.get_context(region_id)); + let mut m2 = Mutation::default(); + m2.set_op(Op::Put); + m2.key = k1.clone(); + m2.value = b"v2".to_vec(); + prewrite_req.mut_mutations().push(m2); + prewrite_req.primary_lock = k1; + prewrite_req.start_version = start_ts; + prewrite_req.lock_ttl = 1000; + prewrite_req.set_try_one_pc(true); + let prewrite_resp = suite + .get_tikv_client(region_id) + .kv_prewrite(&prewrite_req) + .unwrap(); + assert!(prewrite_resp.get_one_pc_commit_ts() > 0); + + 'outer: loop { + let events = receive_event(false).events.to_vec(); + for event in events.into_iter() { + match event.event.unwrap() { + Event_oneof_event::Entries(mut es) => { + for row in es.take_entries().to_vec() { + if row.get_type() == EventLogType::Committed + && row.get_start_ts() == start_ts + { + assert_eq!(row.get_old_value(), b"v1"); + break 'outer; + } + } + } + other => panic!("unknown event {:?}", other), + } + } + } + + suite.stop(); +} + +#[test] +fn test_region_created_replicate() { + let cluster = new_server_cluster(0, 2); + cluster.pd_client.disable_default_operator(); + let mut suite = TestSuite::with_cluster(2, cluster); + + let region = suite.cluster.get_region(&[]); + suite + .cluster + .must_transfer_leader(region.id, new_peer(2, 2)); + suite + .cluster + .pd_client + .must_remove_peer(region.id, new_peer(1, 1)); + + let recv_filter = Box::new( + RegionPacketFilter::new(region.get_id(), 1) + .direction(Direction::Recv) + .msg_type(MessageType::MsgAppend), + ); + suite.cluster.sim.wl().add_recv_filter(1, recv_filter); + suite + .cluster + .pd_client + .must_add_peer(region.id, new_peer(1, 1)); + let region = suite.cluster.get_region(&[]); + let req = suite.new_changedata_request(region.id); + let (mut req_tx, event_feed_wrap, receive_event) = + new_event_feed(suite.get_region_cdc_client(region.id)); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + sleep_ms(1000); + suite.cluster.sim.wl().clear_recv_filters(1); + + let mut counter = 0; + let mut previous_ts = 0; + loop { + let event = receive_event(true); + if let Some(resolved_ts) = event.resolved_ts.as_ref() { + assert!(resolved_ts.ts >= previous_ts); + assert!(resolved_ts.regions == vec![region.id]); + previous_ts = resolved_ts.ts; + counter += 1; + } + if counter > 5 { + break; + } + } + event_feed_wrap.replace(None); + suite.stop(); +} + +#[test] +fn test_cdc_scan_ignore_gc_fence() { + // This case is similar to `test_cdc_scan` but constructs a case with GC Fence. + let mut suite = TestSuite::new(1); + + let (key, v1, v2) = (b"key", b"value1", b"value2"); + + // Write two versions to the key. + let start_ts1 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + let mut mutation = Mutation::default(); + mutation.set_op(Op::Put); + mutation.key = key.to_vec(); + mutation.value = v1.to_vec(); + suite.must_kv_prewrite(1, vec![mutation], key.to_vec(), start_ts1); + + let commit_ts1 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_kv_commit(1, vec![key.to_vec()], start_ts1, commit_ts1); + + let start_ts2 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + let mut mutation = Mutation::default(); + mutation.key = key.to_vec(); + mutation.value = v2.to_vec(); + suite.must_kv_prewrite(1, vec![mutation], key.to_vec(), start_ts2); + + let commit_ts2 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_kv_commit(1, vec![key.to_vec()], start_ts2, commit_ts2); + + // Assume the first version above is written by async commit and it's commit_ts is not unique. + // Use it's commit_ts as another transaction's start_ts. + // Run check_txn_status on commit_ts1 so that gc_fence will be set on the first version. + let caller_start_ts = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + let action = suite.must_check_txn_status( + 1, + key.to_vec(), + commit_ts1, + caller_start_ts, + caller_start_ts, + true, + ); + assert_eq!(action, Action::LockNotExistRollback); + + let req = suite.new_changedata_request(1); + let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + let mut events = receive_event(false).events.to_vec(); + if events.len() == 1 { + events.extend(receive_event(false).events.into_iter()); + } + assert_eq!(events.len(), 2, "{:?}", events); + match events.remove(0).event.unwrap() { + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 2, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Committed, "{:?}", es); + assert_eq!(e.start_ts, start_ts2.into_inner(), "{:?}", es); + assert_eq!(e.commit_ts, commit_ts2.into_inner(), "{:?}", es); + assert_eq!(e.key, key.to_vec(), "{:?}", es); + assert_eq!(e.value, v2.to_vec(), "{:?}", es); + let e = &es.entries[1]; + assert_eq!(e.get_type(), EventLogType::Committed, "{:?}", es); + assert_eq!(e.start_ts, start_ts1.into_inner(), "{:?}", es); + assert_eq!(e.commit_ts, commit_ts1.into_inner(), "{:?}", es); + assert_eq!(e.key, key.to_vec(), "{:?}", es); + assert_eq!(e.value, v1.to_vec(), "{:?}", es); + } + other => panic!("unknown event {:?}", other), + } + match events.pop().unwrap().event.unwrap() { + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Initialized, "{:?}", es); + } + other => panic!("unknown event {:?}", other), + } + + suite.stop(); +} + +#[test] +fn test_cdc_extract_rollback_if_gc_fence_set() { + let mut suite = TestSuite::new(1); + + let req = suite.new_changedata_request(1); + let (mut req_tx, _, receive_event) = new_event_feed(suite.get_region_cdc_client(1)); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + let event = receive_event(false); + event + .events + .into_iter() + .for_each(|e| match e.event.unwrap() { + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Initialized, "{:?}", es); + } + other => panic!("unknown event {:?}", other), + }); + + sleep_ms(1000); + + // Write two versions of a key + let (key, v1, v2, v3) = (b"key", b"value1", b"value2", b"value3"); + let start_ts1 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + let mut mutation = Mutation::default(); + mutation.set_op(Op::Put); + mutation.key = key.to_vec(); + mutation.value = v1.to_vec(); + suite.must_kv_prewrite(1, vec![mutation], key.to_vec(), start_ts1); + + let commit_ts1 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_kv_commit(1, vec![key.to_vec()], start_ts1, commit_ts1); + + let start_ts2 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + let mut mutation = Mutation::default(); + mutation.set_op(Op::Put); + mutation.key = key.to_vec(); + mutation.value = v2.to_vec(); + suite.must_kv_prewrite(1, vec![mutation], key.to_vec(), start_ts2); + + let commit_ts2 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_kv_commit(1, vec![key.to_vec()], start_ts2, commit_ts2); + + // We don't care about the events caused by the previous writings in this test case, and it's + // too complicated to check them. Just skip them here, and wait for resolved_ts to be pushed to + // a greater value than the two versions' commit_ts-es. + let skip_to_ts = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + loop { + let e = receive_event(true); + if let Some(r) = e.resolved_ts.as_ref() { + if r.ts > skip_to_ts.into_inner() { + break; + } + } + } + + // Assume the two versions of the key are written by async commit transactions, and their + // commit_ts-es are also other transaction's start_ts-es. Run check_txn_status on the + // commit_ts-es of the two versions to cause overlapping rollback. + let caller_start_ts = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_check_txn_status( + 1, + key.to_vec(), + commit_ts1, + caller_start_ts, + caller_start_ts, + true, + ); + + // Expects receiving rollback + let event = receive_event(false); + event + .events + .into_iter() + .for_each(|e| match e.event.unwrap() { + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Rollback, "{:?}", es); + assert_eq!(e.get_start_ts(), commit_ts1.into_inner()); + assert_eq!(e.get_commit_ts(), 0); + } + other => panic!("unknown event {:?}", other), + }); + + suite.must_check_txn_status( + 1, + key.to_vec(), + commit_ts2, + caller_start_ts, + caller_start_ts, + true, + ); + + // Expects receiving rollback + let event = receive_event(false); + event + .events + .into_iter() + .for_each(|e| match e.event.unwrap() { + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Rollback, "{:?}", es); + assert_eq!(e.get_start_ts(), commit_ts2.into_inner()); + assert_eq!(e.get_commit_ts(), 0); + } + other => panic!("unknown event {:?}", other), + }); + + // In some special cases, a newly committed record may carry an overlapped rollback initially. + // In this case, gc_fence shouldn't be set, and CDC ignores the rollback and handles the + // committing normally. + let start_ts3 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + let mut mutation = Mutation::default(); + mutation.set_op(Op::Put); + mutation.key = key.to_vec(); + mutation.value = v3.to_vec(); + suite.must_kv_prewrite(1, vec![mutation], key.to_vec(), start_ts3); + // Consume the prewrite event. + let event = receive_event(false); + event + .events + .into_iter() + .for_each(|e| match e.event.unwrap() { + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Prewrite, "{:?}", es); + assert_eq!(e.get_start_ts(), start_ts3.into_inner()); + } + other => panic!("unknown event {:?}", other), + }); + + // Again, assume the transaction is committed with async commit protocol, and the commit_ts is + // also another transaction's start_ts. + let commit_ts3 = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + // Rollback another transaction before committing, then the rolling back information will be + // recorded in the lock. + let caller_start_ts = block_on(suite.cluster.pd_client.get_tso()).unwrap(); + suite.must_check_txn_status( + 1, + key.to_vec(), + commit_ts3, + caller_start_ts, + caller_start_ts, + true, + ); + // Expects receiving rollback + let event = receive_event(false); + event + .events + .into_iter() + .for_each(|e| match e.event.unwrap() { + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Rollback, "{:?}", es); + assert_eq!(e.get_start_ts(), commit_ts3.into_inner()); + assert_eq!(e.get_commit_ts(), 0); + } + other => panic!("unknown event {:?}", other), + }); + // Commit the transaction, then it will have overlapped rollback initially. + suite.must_kv_commit(1, vec![key.to_vec()], start_ts3, commit_ts3); + // Expects receiving a normal committing event. + let event = receive_event(false); + event + .events + .into_iter() + .for_each(|e| match e.event.unwrap() { + Event_oneof_event::Entries(es) => { + assert!(es.entries.len() == 1, "{:?}", es); + let e = &es.entries[0]; + assert_eq!(e.get_type(), EventLogType::Commit, "{:?}", es); + assert_eq!(e.get_start_ts(), start_ts3.into_inner()); + assert_eq!(e.get_commit_ts(), commit_ts3.into_inner()); + assert_eq!(e.get_value(), v3); + } + other => panic!("unknown event {:?}", other), + }); + + suite.stop(); +} + +// This test is created for covering the case that term was increased without leader change. +// Ideally leader id and term in StoreMeta should be updated together with a yielded SoftState, +// but sometimes the leader was transferred to another store and then changed back, +// a follower would not get a new SoftState. +#[test] +fn test_term_change() { + let cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + let mut suite = TestSuite::with_cluster(3, cluster); + let region = suite.cluster.get_region(&[]); + suite + .cluster + .must_transfer_leader(region.id, new_peer(2, 2)); + // Simulate network partition. + let recv_filter = + Box::new(RegionPacketFilter::new(region.get_id(), 1).direction(Direction::Recv)); + suite.cluster.sim.wl().add_recv_filter(1, recv_filter); + // Transfer leader to peer 3 and then change it back to peer 2. + // Peer 1 would not get a new SoftState. + suite + .cluster + .must_transfer_leader(region.id, new_peer(3, 3)); + suite + .cluster + .must_transfer_leader(region.id, new_peer(2, 2)); + suite.cluster.sim.wl().clear_recv_filters(1); + + suite + .cluster + .pd_client + .must_remove_peer(region.id, new_peer(3, 3)); + let region = suite.cluster.get_region(&[]); + let req = suite.new_changedata_request(region.id); + let (mut req_tx, event_feed_wrap, receive_event) = + new_event_feed(suite.get_region_cdc_client(region.id)); + block_on(req_tx.send((req, WriteFlags::default()))).unwrap(); + let mut counter = 0; + let mut previous_ts = 0; + loop { + let event = receive_event(true); + if let Some(resolved_ts) = event.resolved_ts.as_ref() { + assert!(resolved_ts.ts >= previous_ts); + assert!(resolved_ts.regions == vec![region.id]); + previous_ts = resolved_ts.ts; + counter += 1; + } + if counter > 5 { + break; + } + } + event_feed_wrap.replace(None); +>>>>>>> 927e36f95... cdc: fix old value config glitch when changefeeds with different settings connect to one region (#9515) suite.stop(); }