Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strengthen configuration change approval #371

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add log for current ReadIndex mechanism (#370)
Signed-off-by: wuhuizuo <wuhuizuo@126.com>
CalvinNeo authored and wuhuizuo committed May 26, 2024
commit 835322c5416afffd72bfb0bd26b6c7f156e74533
Original file line number Diff line number Diff line change
@@ -447,6 +447,10 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
}
}

pub fn get_router(&self, node_id: u64) -> Option<RaftRouter<TiFlashEngine, ProxyRaftEngine>> {
self.sim.rl().get_router(node_id)
}

fn valid_leader_id(&self, region_id: u64, leader_id: u64) -> bool {
let store_ids = match self.voter_store_ids_of_region(region_id) {
None => return false,
Original file line number Diff line number Diff line change
@@ -359,6 +359,7 @@ unsafe extern "C" fn ffi_release_pre_handled_snapshot(
pub fn gen_engine_store_server_helper(
wrap: Pin<&EngineStoreServerWrap>,
) -> EngineStoreServerHelper {
info!("mock gen_engine_store_server_helper");
EngineStoreServerHelper {
magic_number: interfaces_ffi::RAFT_STORE_PROXY_MAGIC_NUMBER,
version: interfaces_ffi::RAFT_STORE_PROXY_VERSION,
2 changes: 1 addition & 1 deletion proxy_components/proxy_ffi/src/read_index_helper.rs
Original file line number Diff line number Diff line change
@@ -82,7 +82,7 @@ fn into_read_index_response<S: engine_traits::Snapshot>(
resp
}

fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest {
pub fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest {
let region_id = req.get_context().get_region_id();
let mut cmd = RaftCmdRequest::default();
{
284 changes: 284 additions & 0 deletions proxy_tests/proxy/shared/replica_read.rs
Original file line number Diff line number Diff line change
@@ -356,3 +356,287 @@ fn test_util() {
}
assert!(GC_MONITOR.valid_clean());
}

use kvproto::{
kvrpcpb::{Context, DiskFullOpt, KeyRange},
raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request as RaftRequest},
raft_serverpb::RaftMessage,
};
use raftstore::{
router::RaftStoreRouter,
store::{msg::Callback, RaftCmdExtraOpts, ReadIndexContext},
};
use tokio::sync::oneshot;
use txn_types::{Key, Lock, LockType, TimeStamp};
use uuid::Uuid;

use crate::utils::v1_server::{new_server_cluster, ChannelBuilder, Environment, TikvClient};

// https://github.com/tikv/tikv/issues/16823
#[test]
fn test_raft_cmd_request_cant_advanve_max_ts() {
use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse};

let mut cluster = new_server_cluster(0, 1);
cluster.run();

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);

let region = cluster.get_region(b"");
let leader = region.get_peers()[0].clone();
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

let mut ctx = Context::default();
let region_id = leader.get_id();
ctx.set_region_id(leader.get_id());
ctx.set_region_epoch(region.get_region_epoch().clone());
ctx.set_peer(leader);

let read_index = |ranges: &[(&[u8], &[u8])]| {
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();

let mut cmd = RaftCmdRequest::default();
{
let mut header = RaftRequestHeader::default();
let mut inner_req = RaftRequest::default();
inner_req.set_cmd_type(CmdType::ReadIndex);
inner_req
.mut_read_index()
.set_start_ts(start_ts.into_inner());

let mut req = ReadIndexRequest::default();
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
req.set_context(ctx.clone());
req.set_start_ts(start_ts.into_inner());
for &(start_key, end_key) in ranges {
let mut range = KeyRange::default();
range.set_start_key(start_key.to_vec());
range.set_end_key(end_key.to_vec());
req.mut_ranges().push(range);
}

header.set_region_id(region_id);
header.set_peer(req.get_context().get_peer().clone());
header.set_region_epoch(req.get_context().get_region_epoch().clone());
cmd.set_header(header);
cmd.set_requests(vec![inner_req].into());
}

let (result_tx, result_rx) = oneshot::channel();
let router = cluster.get_router(1).unwrap();
if let Err(e) = router.send_command(
cmd,
Callback::read(Box::new(move |resp| {
result_tx.send(resp.response).unwrap();
})),
RaftCmdExtraOpts {
deadline: None,
disk_full_opt: DiskFullOpt::AllowedOnAlmostFull,
},
) {
panic!("router send msg failed, error: {}", e);
}

let resp = block_on(result_rx).unwrap();
(resp.get_responses()[0].get_read_index().clone(), start_ts)
};

// wait a while until the node updates its own max ts
std::thread::sleep(Duration::from_millis(300));

let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
assert!(!resp.has_locked());
// Actually not changed
assert_eq!(cm.max_ts(), prev_cm_max_ts);
assert_ne!(cm.max_ts(), start_ts);
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}

// https://github.com/tikv/tikv/pull/8669/files
#[test]
fn test_raft_cmd_request_learner_advanve_max_ts() {
use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse};

let mut cluster = new_server_cluster(0, 2);
cluster.pd_client.disable_default_operator();
let region_id = cluster.run_conf_change();
let region = cluster.get_region(b"");
assert_eq!(region_id, 1);
assert_eq!(region.get_id(), 1);
let leader = region.get_peers()[0].clone();

fail::cfg("on_pre_write_apply_state", "return(true)").unwrap();
let learner = new_learner_peer(2, 2);
cluster.pd_client.must_add_peer(1, learner.clone());

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);
let keys: Vec<_> = vec![b"k", b"l"]
.into_iter()
.map(|k| Key::from_raw(k))
.collect();
let guards = block_on(cm.lock_keys(keys.iter()));
let lock = Lock::new(
LockType::Put,
b"k".to_vec(),
1.into(),
20000,
None,
1.into(),
1,
2.into(),
false,
);
guards[0].with_lock(|l| *l = Some(lock.clone()));

let addr = cluster.sim.rl().get_addr(learner.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

// cluster.must_put(b"k", b"v");

let read_index = |ranges: &[(&[u8], &[u8])]| {
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();

// https://github.com/pingcap/tiflash/blob/14a127820d0530e496af624bb5b69acd48caf747/dbms/src/Storages/KVStore/Read/ReadIndex.cpp#L39
let mut ctx = Context::default();
let learner = learner.clone();
ctx.set_region_id(region_id);
ctx.set_region_epoch(region.get_region_epoch().clone());
ctx.set_peer(learner);
let mut read_index_request = ReadIndexRequest::default();
read_index_request.set_context(ctx);
read_index_request.set_start_ts(start_ts.into_inner());
for (s, e) in ranges {
let mut r = KeyRange::new();
r.set_start_key(s.to_vec());
r.set_end_key(e.to_vec());
read_index_request.mut_ranges().push(r);
}
let mut cmd =
proxy_ffi::read_index_helper::gen_read_index_raft_cmd_req(&mut read_index_request);

let (result_tx, result_rx) = oneshot::channel();
let router = cluster.get_router(2).unwrap();
if let Err(e) = router.send_command(
cmd,
Callback::read(Box::new(move |resp| {
result_tx.send(resp.response).unwrap();
})),
RaftCmdExtraOpts {
deadline: None,
disk_full_opt: DiskFullOpt::AllowedOnAlmostFull,
},
) {
panic!("router send msg failed, error: {}", e);
}

let resp = block_on(result_rx).unwrap();
(resp.get_responses()[0].get_read_index().clone(), start_ts)
};

// wait a while until the node updates its own max ts
std::thread::sleep(Duration::from_millis(3000));

must_wait_until_cond_node(
&cluster.cluster_ext,
region_id,
None,
&|states: &States| -> bool {
states.in_disk_region_state.get_region().get_peers().len() == 2
},
);

let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
assert!(!resp.has_locked());
// Actually not changed
assert_ne!(cm.max_ts(), prev_cm_max_ts);
assert_eq!(cm.max_ts(), start_ts);

// `gen_read_index_raft_cmd_req` can only handle one key-range
let (resp, start_ts) = read_index(&[(b"j", b"k0")]);
assert_eq!(resp.get_locked(), &lock.into_lock_info(b"k".to_vec()));
assert_eq!(cm.max_ts(), start_ts);

drop(guards);

let (resp, start_ts) = read_index(&[(b"a", b"z")]);
assert!(!resp.has_locked());
assert_eq!(cm.max_ts(), start_ts);
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}

#[test]
fn test_raft_message_can_advanve_max_ts() {
use kvproto::raft_cmdpb::{ReadIndexRequest, ReadIndexResponse};
let mut cluster = new_server_cluster(0, 1);
cluster.run();

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);

let region = cluster.get_region(b"");
let leader = region.get_peers()[0].clone();
let follower = new_learner_peer(2, 2);
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

let region_id = leader.get_id();

let read_index = |ranges: &[(&[u8], &[u8])]| {
let mut m = raft::eraftpb::Message::default();
m.set_msg_type(MessageType::MsgReadIndex);
let mut read_index_req = ReadIndexRequest::default();
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
read_index_req.set_start_ts(start_ts.into_inner());
for &(start_key, end_key) in ranges {
let mut range = KeyRange::default();
range.set_start_key(start_key.to_vec());
range.set_end_key(end_key.to_vec());
read_index_req.mut_key_ranges().push(range);
}

let rctx = ReadIndexContext {
id: Uuid::new_v4(),
request: Some(read_index_req),
locked: None,
};
let mut e = raft::eraftpb::Entry::default();
e.set_data(rctx.to_bytes().into());
m.mut_entries().push(e);
m.set_from(2);

let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default();
raft_msg.set_region_id(region.get_id());
raft_msg.set_from_peer(follower);
raft_msg.set_to_peer(leader);
raft_msg.set_region_epoch(region.get_region_epoch().clone());
raft_msg.set_message(m);
cluster.send_raft_msg(raft_msg).unwrap();

(ReadIndexResponse::default(), start_ts)
};

// wait a while until the node updates its own max ts
let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
cluster.must_put(b"a", b"b");
std::thread::sleep(Duration::from_millis(2000));
// assert!(!resp.has_locked());
// Actually not changed
assert_ne!(cm.max_ts(), prev_cm_max_ts);
assert_eq!(cm.max_ts(), start_ts);
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}