Skip to content

Commit

Permalink
cherry pick tikv#9515 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
liuzix authored and ti-srebot committed Jan 25, 2021
1 parent 87a7d11 commit 9f2dcd4
Show file tree
Hide file tree
Showing 4 changed files with 740 additions and 12 deletions.
19 changes: 17 additions & 2 deletions components/cdc/src/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub struct Downstream {
region_epoch: RegionEpoch,
sink: Option<BatchSender<CdcEvent>>,
state: Arc<AtomicCell<DownstreamState>>,
enable_old_value: bool,
}

impl Downstream {
Expand All @@ -92,6 +93,7 @@ impl Downstream {
region_epoch: RegionEpoch,
req_id: u64,
conn_id: ConnID,
enable_old_value: bool,
) -> Downstream {
Downstream {
id: DownstreamID::new(),
Expand All @@ -101,6 +103,7 @@ impl Downstream {
region_epoch,
sink: None,
state: Arc::new(AtomicCell::new(DownstreamState::default())),
enable_old_value,
}
}

Expand Down Expand Up @@ -366,7 +369,19 @@ impl Delegate {
if normal_only && downstreams[i].state.load() != DownstreamState::Normal {
continue;
}
<<<<<<< HEAD
downstreams[i].sink_event(change_data_event.clone());
=======
let mut event = change_data_event.clone();
if !downstream.enable_old_value && self.txn_extra_op == TxnExtraOp::ReadOldValue {
if let Some(Event_oneof_event::Entries(ref mut entries)) = event.event {
for entry in entries.mut_entries().iter_mut() {
entry.mut_old_value().clear();
}
}
}
downstream.sink_event(event);
>>>>>>> 927e36f95... cdc: fix old value config glitch when changefeeds with different settings connect to one region (#9515)
}
downstreams.last().unwrap().sink_event(change_data_event);
}
Expand Down Expand Up @@ -802,7 +817,7 @@ mod tests {
let rx = BatchReceiver::new(rx, 1, Vec::new, VecCollector);
let request_id = 123;
let mut downstream =
Downstream::new(String::new(), region_epoch, request_id, ConnID::new());
Downstream::new(String::new(), region_epoch, request_id, ConnID::new(), true);
downstream.set_sink(sink);
let mut delegate = Delegate::new(region_id);
delegate.subscribe(downstream);
Expand Down Expand Up @@ -936,7 +951,7 @@ mod tests {
let rx = BatchReceiver::new(rx, 1, Vec::new, VecCollector);
let request_id = 123;
let mut downstream =
Downstream::new(String::new(), region_epoch, request_id, ConnID::new());
Downstream::new(String::new(), region_epoch, request_id, ConnID::new(), true);
let downstream_id = downstream.get_id();
downstream.set_sink(sink);
let mut delegate = Delegate::new(region_id);
Expand Down
20 changes: 10 additions & 10 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ mod tests {
let mut req = ChangeDataRequest::default();
req.set_region_id(1);
let region_epoch = req.get_region_epoch().clone();
let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id, true);
ep.run(Task::Register {
request: req,
downstream,
Expand Down Expand Up @@ -1226,7 +1226,7 @@ mod tests {
let mut req = ChangeDataRequest::default();
req.set_region_id(1);
let region_epoch = req.get_region_epoch().clone();
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 1, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 1, conn_id, true);
ep.run(Task::Register {
request: req.clone(),
downstream,
Expand All @@ -1236,7 +1236,7 @@ mod tests {
assert_eq!(ep.capture_regions.len(), 1);

// duplicate request error.
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 2, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 2, conn_id, true);
ep.run(Task::Register {
request: req.clone(),
downstream,
Expand All @@ -1260,7 +1260,7 @@ mod tests {
assert_eq!(ep.capture_regions.len(), 1);

// Compatibility error.
let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id, true);
ep.run(Task::Register {
request: req,
downstream,
Expand Down Expand Up @@ -1314,7 +1314,7 @@ mod tests {
let mut req = ChangeDataRequest::default();
req.set_region_id(1);
let region_epoch = req.get_region_epoch().clone();
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true);
ep.run(Task::Register {
request: req.clone(),
downstream,
Expand All @@ -1339,7 +1339,7 @@ mod tests {

// Register region 2 to the conn.
req.set_region_id(2);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true);
ep.run(Task::Register {
request: req.clone(),
downstream,
Expand Down Expand Up @@ -1372,7 +1372,7 @@ mod tests {
let conn_id = conn.get_id();
ep.run(Task::OpenConn { conn });
req.set_region_id(3);
let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id, true);
ep.run(Task::Register {
request: req,
downstream,
Expand Down Expand Up @@ -1439,7 +1439,7 @@ mod tests {
let mut req = ChangeDataRequest::default();
req.set_region_id(1);
let region_epoch = req.get_region_epoch().clone();
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true);
let downstream_id = downstream.get_id();
ep.run(Task::Register {
request: req.clone(),
Expand Down Expand Up @@ -1473,7 +1473,7 @@ mod tests {
}
assert_eq!(ep.capture_regions.len(), 0);

let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true);
let new_downstream_id = downstream.get_id();
ep.run(Task::Register {
request: req.clone(),
Expand Down Expand Up @@ -1516,7 +1516,7 @@ mod tests {
assert_eq!(ep.capture_regions.len(), 0);

// Stale deregister should be filtered.
let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id, true);
ep.run(Task::Register {
request: req,
downstream,
Expand Down
13 changes: 13 additions & 0 deletions components/cdc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use grpcio::{
use kvproto::cdcpb::{
ChangeData, ChangeDataEvent, ChangeDataRequest, Compatibility, Event, ResolvedTs,
};
use kvproto::kvrpcpb::ExtraOp as TxnExtraOp;
use protobuf::Message;
use security::{check_common_name, SecurityManager};
use tikv_util::collections::HashMap;
Expand Down Expand Up @@ -307,6 +308,7 @@ impl ChangeData for Service {
let recv_req = stream.for_each(move |request| {
let region_epoch = request.get_region_epoch().clone();
let req_id = request.get_request_id();
let enable_old_value = request.get_extra_op() == TxnExtraOp::ReadOldValue;
let version = match semver::Version::parse(request.get_header().get_ticdc_version()) {
Ok(v) => v,
Err(e) => {
Expand All @@ -316,8 +318,19 @@ impl ChangeData for Service {
semver::Version::new(0, 0, 0)
}
};
<<<<<<< HEAD
let downstream = Downstream::new(peer.clone(), region_epoch, req_id, conn_id);
scheduler
=======
let downstream = Downstream::new(
peer.clone(),
region_epoch,
req_id,
conn_id,
enable_old_value,
);
let ret = scheduler
>>>>>>> 927e36f95... cdc: fix old value config glitch when changefeeds with different settings connect to one region (#9515)
.schedule(Task::Register {
request,
downstream,
Expand Down
Loading

0 comments on commit 9f2dcd4

Please sign in to comment.