Skip to content

Commit

Permalink
Add some tests for replica read (#369)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo authored Apr 12, 2024
1 parent 7dc50b4 commit 79da4a9
Showing 1 changed file with 120 additions and 2 deletions.
122 changes: 120 additions & 2 deletions proxy_tests/proxy/shared/replica_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use engine_store_ffi::ffi::{
ProtoMsgBaseBuff,
};

use crate::utils::v1::*;
use crate::{shared::ffi, utils::v1::*};

#[derive(Default)]
struct GcMonitor {
Expand Down Expand Up @@ -145,7 +145,7 @@ extern "C" fn ffi_wake(data: RawVoidPtr) {
}

#[test]
fn test_read_index() {
fn test_read_index_normal() {
// Initialize cluster
let (mut cluster, pd_client) = new_mock_cluster(0, 3);
configure_for_lease_read(&mut cluster, Some(50), Some(10_000));
Expand Down Expand Up @@ -208,6 +208,124 @@ fn test_read_index() {
cluster.shutdown();
}

/// If a read index request is received while region state is Applying,
/// it could be handled correctly.
#[test]
fn test_read_index_applying() {
// Initialize cluster
let (mut cluster, pd_client) = new_mock_cluster(0, 2);
configure_for_lease_read(&mut cluster, Some(50), Some(10_000));
cluster.cfg.raft_store.raft_heartbeat_ticks = 1;
cluster.cfg.raft_store.raft_log_compact_sync_interval = ReadableDuration::millis(500);
pd_client.disable_default_operator();
disable_auto_gen_compact_log(&mut cluster);
// Otherwise will panic with `assert_eq!(apply_state, last_applied_state)`.
fail::cfg("on_pre_write_apply_state", "return(true)").unwrap();
// Set region and peers
let r1 = cluster.run_conf_change();
let p1 = new_peer(1, 1);
let p2 = new_learner_peer(2, 2);

cluster.pd_client.must_add_peer(r1, p2.clone());
cluster.must_put(b"k0", b"v");
{
let prev_state = maybe_collect_states(&cluster.cluster_ext, r1, Some(vec![1]));
let (compact_index, compact_term) = get_valid_compact_index_by(&prev_state, Some(vec![1]));
}
cluster.pd_client.must_none_pending_peer(p2.clone());
// assert_eq!(cluster.pd_client.get_pending_peers().len(), 0);
let region = cluster.get_region(b"k0");
assert_eq!(cluster.leader_of_region(region.get_id()).unwrap(), p1);

check_key(&cluster, b"k0", b"v", Some(true), None, Some(vec![1]));

fail::cfg("region_apply_snap", "return").unwrap();
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(r1, 2)
.msg_type(MessageType::MsgAppend)
.direction(Direction::Both),
));

for i in 1..5 {
cluster.must_put(format!("k{}", i).as_bytes(), b"v");
}

check_key(&cluster, b"k4", b"v", Some(true), None, Some(vec![1]));

{
let prev_state = collect_all_states(&cluster.cluster_ext, r1);
let (compact_index, compact_term) = get_valid_compact_index_by(&prev_state, Some(vec![1]));
let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term);
let req = test_raftstore::new_admin_request(r1, region.get_region_epoch(), compact_log);
let res = cluster
.call_command_on_leader(req, Duration::from_secs(3))
.unwrap();
assert!(!res.get_header().has_error(), "{:?}", res);
}

cluster.must_put(b"kz", b"v");
check_key(&cluster, b"kz", b"v", Some(true), None, Some(vec![1]));
check_key(&cluster, b"k1", b"v", Some(false), None, Some(vec![2]));

// Wait until gc.
std::thread::sleep(std::time::Duration::from_millis(1500));

cluster.clear_send_filters();

let waker = Waker::new();

for (id, peer, f) in &[(2, p2, true)] {
iter_ffi_helpers(
&cluster,
Some(vec![*id]),
&mut |_, ffi_helper: &mut FFIHelperSet| {
assert_eq!(
general_get_region_local_state(
&ffi_helper.engine_store_server.engines.as_ref().unwrap().kv,
r1
)
.unwrap()
.get_state(),
PeerState::Applying
);
let mut request = kvproto::kvrpcpb::ReadIndexRequest::default();

{
let context = request.mut_context();
context.set_region_id(region.get_id());
context.set_peer(peer.clone());
context.set_region_epoch(region.get_region_epoch().clone());
request.set_start_ts(666);

let mut range = kvproto::kvrpcpb::KeyRange::default();
range.set_start_key(region.get_start_key().to_vec());
range.set_end_key(region.get_end_key().to_vec());
request.mut_ranges().push(range);

debug!("make read index request {:?}", &request);
}
let w = if *f { Some(&waker) } else { None };
let resp = blocked_read_index(&request, &*ffi_helper.proxy_helper, w).unwrap();
assert_ne!(resp.get_read_index(), 0);
debug!("resp detail {:?}", resp);
assert!(!resp.has_region_error());
assert!(!resp.has_locked());
},
);
}

drop(waker);

{
assert!(!GC_MONITOR.is_empty());
assert!(GC_MONITOR.valid_clean());
}

cluster.shutdown();
fail::remove("on_pre_write_apply_state");
fail::remove("region_apply_snap");
}

#[test]
fn test_util() {
// test timer
Expand Down

0 comments on commit 79da4a9

Please sign in to comment.