Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy: Fix ReadIndex Request not advance max_ts #8945

Closed
CalvinNeo opened this issue Apr 15, 2024 · 2 comments
Closed

Proxy: Fix ReadIndex Request not advance max_ts #8945

CalvinNeo opened this issue Apr 15, 2024 · 2 comments
Labels
type/enhancement The issue or PR belongs to an enhancement.

Comments

@CalvinNeo
Copy link
Member

Enhancement

The current implementation may not update max_ts. As test shows in tikv/tikv#16823

This could lead to #8845.

@CalvinNeo CalvinNeo added the type/enhancement The issue or PR belongs to an enhancement. label Apr 15, 2024
@CalvinNeo CalvinNeo changed the title Proxy: Use RaftMessage to convey ReadIndex Request Proxy: Fix ReadIndex Request not advance max_ts Apr 16, 2024
@CalvinNeo
Copy link
Member Author

use kvproto::{
    kvrpcpb::{Context, DiskFullOpt, KeyRange},
    raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request as RaftRequest},
};
use raftstore::{
    router::RaftStoreRouter,
    store::{msg::Callback, RaftCmdExtraOpts},
};
use tokio::sync::oneshot;
use txn_types::{Key, Lock, LockType, TimeStamp};
use raftstore::store::ReadIndexContext;
use uuid::Uuid;
use kvproto::raft_serverpb::RaftMessage;
use crate::utils::v1_server::{new_server_cluster, ChannelBuilder, Environment, TikvClient};

// https://github.com/tikv/tikv/issues/16823
#[test]
fn test_raft_cmd_request_cant_advanve_max_ts() {
    use kvproto::kvrpcpb::ReadIndexRequest;
    use kvproto::kvrpcpb::ReadIndexResponse;
    
    let mut cluster = new_server_cluster(0, 1);
    cluster.run();

    let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);
    let keys: Vec<_> = vec![b"k", b"l"]
        .into_iter()
        .map(|k| Key::from_raw(k))
        .collect();
    let guards = block_on(cm.lock_keys(keys.iter()));
    let lock = Lock::new(
        LockType::Put,
        b"k".to_vec(),
        1.into(),
        20000,
        None,
        1.into(),
        1,
        2.into(),
        false,
    );
    guards[0].with_lock(|l| *l = Some(lock.clone()));

    let region = cluster.get_region(b"");
    let leader = region.get_peers()[0].clone();
    let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

    let env = Arc::new(Environment::new(1));
    let channel = ChannelBuilder::new(env).connect(&addr);
    let client = TikvClient::new(channel);

    let mut ctx = Context::default();
    let region_id = leader.get_id();
    ctx.set_region_id(leader.get_id());
    ctx.set_region_epoch(region.get_region_epoch().clone());
    ctx.set_peer(leader);

    let read_index = |ranges: &[(&[u8], &[u8])]| {
        let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();

        let mut cmd = RaftCmdRequest::default();
        {
            let mut header = RaftRequestHeader::default();
            let mut inner_req = RaftRequest::default();
            inner_req.set_cmd_type(CmdType::ReadIndex);
            inner_req.mut_read_index().set_start_ts(start_ts.into_inner());

            let mut req = ReadIndexRequest::default();
            let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
            req.set_context(ctx.clone());
            req.set_start_ts(start_ts.into_inner());
            for &(start_key, end_key) in ranges {
                let mut range = KeyRange::default();
                range.set_start_key(start_key.to_vec());
                range.set_end_key(end_key.to_vec());
                req.mut_ranges().push(range);
            }

            header.set_region_id(region_id);
            header.set_peer(req.get_context().get_peer().clone());
            header.set_region_epoch(req.get_context().get_region_epoch().clone());
            cmd.set_header(header);
            cmd.set_requests(vec![inner_req].into());
        }

        let (result_tx, result_rx) = oneshot::channel();
        let router = cluster.get_router(1).unwrap();
        if let Err(e) = router.send_command(
            cmd,
            Callback::read(Box::new(move |resp| {
                result_tx.send(resp.response).unwrap();
            })),
            RaftCmdExtraOpts {
                deadline: None,
                disk_full_opt: DiskFullOpt::AllowedOnAlmostFull,
            },
        ) {
            panic!(
                "router send msg failed, error: {}",
                e
            );
        }
        
        let resp = block_on(result_rx).unwrap();
        (resp.get_responses()[0].get_read_index().clone(), start_ts)
    };

    // wait a while until the node updates its own max ts
    std::thread::sleep(Duration::from_millis(300));

    let prev_cm_max_ts = cm.max_ts();
    let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
    assert!(!resp.has_locked());
    // Actually not changed
    assert_eq!(cm.max_ts(), prev_cm_max_ts);
    assert_ne!(cm.max_ts(), start_ts);
}

#[test]
fn test_raft_message_can_advanve_max_ts() {
    use kvproto::raft_cmdpb::ReadIndexRequest;
    use kvproto::raft_cmdpb::ReadIndexResponse;
    let mut cluster = new_server_cluster(0, 1);
    cluster.run();

    let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);
    let keys: Vec<_> = vec![b"k", b"l"]
        .into_iter()
        .map(|k| Key::from_raw(k))
        .collect();
    let guards = block_on(cm.lock_keys(keys.iter()));
    let lock = Lock::new(
        LockType::Put,
        b"k".to_vec(),
        1.into(),
        20000,
        None,
        1.into(),
        1,
        2.into(),
        false,
    );
    guards[0].with_lock(|l| *l = Some(lock.clone()));

    let region = cluster.get_region(b"");
    let leader = region.get_peers()[0].clone();
    let follower = new_learner_peer(2, 2);
    let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

    let env = Arc::new(Environment::new(1));
    let channel = ChannelBuilder::new(env).connect(&addr);
    let client = TikvClient::new(channel);

    let mut ctx = Context::default();
    let region_id = leader.get_id();

    let read_index = |ranges: &[(&[u8], &[u8])]| {
        let mut m = raft::eraftpb::Message::default();
        m.set_msg_type(MessageType::MsgReadIndex);
        let mut read_index_req = ReadIndexRequest::default();
        let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
        read_index_req.set_start_ts(start_ts.into_inner());
        for &(start_key, end_key) in ranges {
            let mut range = KeyRange::default();
            range.set_start_key(start_key.to_vec());
            range.set_end_key(end_key.to_vec());
            read_index_req.mut_key_ranges().push(range);
        }

        let rctx = ReadIndexContext {
            id: Uuid::new_v4(),
            request: Some(read_index_req),
            locked: None,
        };
        let mut e = raft::eraftpb::Entry::default();
        e.set_data(rctx.to_bytes().into());
        m.mut_entries().push(e);
        m.set_from(2);

        let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default();
        raft_msg.set_region_id(region.get_id());
        raft_msg.set_from_peer(follower);
        raft_msg.set_to_peer(leader);
        raft_msg.set_region_epoch(region.get_region_epoch().clone());
        raft_msg.set_message(m);
        cluster.send_raft_msg(raft_msg).unwrap();

        (ReadIndexResponse::default(), start_ts)
    };

    // wait a while until the node updates its own max ts

    let prev_cm_max_ts = cm.max_ts();
    let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
    cluster.must_put(b"a", b"b");
    std::thread::sleep(Duration::from_millis(2000));
    // assert!(!resp.has_locked());
    // Actually not changed
    assert_ne!(cm.max_ts(), prev_cm_max_ts);
    assert_eq!(cm.max_ts(), start_ts);
}

@CalvinNeo
Copy link
Member Author

CalvinNeo commented Apr 22, 2024

Closed because it did not actually lead to #8845.
And the "ideal fix" has already been implemented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

No branches or pull requests

1 participant