diff --git a/Cargo.lock b/Cargo.lock index 4478542dd59..bdf9d702b01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7826,6 +7826,7 @@ dependencies = [ name = "test_cloud_server" version = "0.1.0" dependencies = [ + "anyhow", "api_version", "bstr", "bytes", diff --git a/components/kvengine/src/stats.rs b/components/kvengine/src/stats.rs index 2a55fce5346..2a0b01f2ea8 100644 --- a/components/kvengine/src/stats.rs +++ b/components/kvengine/src/stats.rs @@ -1,6 +1,6 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::cmp; +use std::{cmp, collections::HashSet}; use bytes::Bytes; @@ -62,9 +62,18 @@ impl EngineStats { impl super::Engine { pub fn get_all_shard_stats(&self) -> Vec { + self.get_all_shard_stats_ext(None) + } + + pub fn get_all_shard_stats_ext(&self, skip_shards: Option<&HashSet>) -> Vec { self.get_all_shard_id_vers() .into_iter() - .filter_map(|id_ver| self.get_shard(id_ver.id).map(|shard| shard.get_stats())) + .filter_map(|id_ver| { + if skip_shards.is_some_and(|skip_shards| skip_shards.contains(&id_ver)) { + return None; + } + self.get_shard(id_ver.id).map(|shard| shard.get_stats()) + }) .collect() } diff --git a/components/test_cloud_server/Cargo.toml b/components/test_cloud_server/Cargo.toml index ec42a69fce0..c614141d38f 100644 --- a/components/test_cloud_server/Cargo.toml +++ b/components/test_cloud_server/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0" api_version = { path = "../api_version" } bstr = "0.2.17" bytes = "1" diff --git a/components/test_cloud_server/src/cluster.rs b/components/test_cloud_server/src/cluster.rs index 06be59da74e..24d7280d2fb 100644 --- a/components/test_cloud_server/src/cluster.rs +++ b/components/test_cloud_server/src/cluster.rs @@ -1,7 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, path::{Path, PathBuf}, sync::{ atomic::{AtomicU16, Ordering::Relaxed}, @@ -11,6 +11,7 @@ use std::{ time::Duration, }; +use anyhow::bail; use cloud_server::TikvServer; use cloud_worker::CloudWorker; use dashmap::DashMap; @@ -19,6 +20,7 @@ use grpcio::{Channel, ChannelBuilder, EnvBuilder, Environment}; use kvengine::{dfs::Dfs, ShardStats}; use kvproto::{ kvrpcpb::{Mutation, Op}, + metapb, metapb::PeerRole, raft_cmdpb::{RaftCmdRequest, RaftCmdResponse, RaftRequestHeader}, }; @@ -501,11 +503,18 @@ impl ServerCluster { } pub fn get_data_stats(&self) -> ClusterDataStats { + self.get_data_stats_ext(None) + } + + pub fn get_data_stats_ext( + &self, + skip_shards: Option<&HashSet>, + ) -> ClusterDataStats { let mut stats = ClusterDataStats::default(); for server in self.servers.values() { let store_id = server.get_store_id(); let kv_engine = server.get_kv_engine(); - stats.add(store_id, kv_engine.get_all_shard_stats()); + stats.add(store_id, kv_engine.get_all_shard_stats_ext(skip_shards)); } stats } @@ -535,24 +544,6 @@ impl ServerCluster { .collect() } - // Wait shard version match between PD & kvengine. - pub fn wait_region_version_match(&self) { - let pd_client = self.pd_client.as_ref(); - let ok = try_wait( - || { - let data_stats = self.get_data_stats(); - data_stats.check_region_version_match(pd_client).is_ok() - }, - 10, - ); - if !ok { - let data_stats = self.get_data_stats(); - data_stats - .check_region_version_match(pd_client) - .expect("check_region_version_match failed"); - } - } - pub fn pd_endpoints(&self) -> &[String] { self.pd.endpoints().unwrap() } @@ -926,13 +917,37 @@ impl ClusterDataStats { } } - pub fn check_data(&self) -> Result<(), String> { + pub fn check_data( + &self, + ) -> Result< + (), + ( + Vec, // error reasons + HashSet, // success_shards + ), + > { + let mut success_shards = HashSet::with_capacity(self.regions.len()); + let mut errs = vec![]; for stats in self.regions.values() { let map_err_fn = |e| format!("err {} stats: {:?}", e, stats); - stats.check_consistency().map_err(map_err_fn)?; - stats.check_healthy().map_err(map_err_fn)?; + if let Err(err) = stats.check_consistency() { + errs.push(map_err_fn(err)); + continue; + } + let leader = match stats.check_healthy() { + Ok(leader) => leader, + Err(err) => { + errs.push(map_err_fn(err)); + continue; + } + }; + success_shards.insert(kvengine::IdVer::new(leader.id, leader.ver)); + } + if errs.is_empty() { + Ok(()) + } else { + Err((errs, success_shards)) } - Ok(()) } pub fn check_leader(&self) -> Result<(), String> { @@ -953,54 +968,38 @@ impl ClusterDataStats { } } + pub fn log_all(&self) { + self.iter_shard_stats(|store_id, shard_stats| { + info!("shard_stats: {}:{:?}", store_id, shard_stats); + false + }); + } + fn get_region_shard_stats(&self, region_id: u64) -> Option<&ShardStats> { self.regions .get(®ion_id) .map(|stats| stats.shard_stats.values().next().unwrap()) } - pub fn check_region_version_match(&self, pd_client: &dyn PdClientExt) -> Result<(), String> { - let regions = pd_client.get_all_regions(); - check_regions_boundary(&[], &[], true, ®ions) - .map_err(|e| format!("check_regions_boundary failed: {:?}", e))?; - for region in ®ions { - let region_id = region.get_id(); - let region_shard_stats = self.get_region_shard_stats(region_id).ok_or_else(|| { - format!( - "region not found in cluster, region:{:?}, cluster:{:?}", - region, self.regions - ) - })?; - if region_shard_stats.total_size == 0 { - continue; - } - - let region_pd_version = region.get_region_epoch().get_version(); - let region_shard_version = region_shard_stats.ver; - if region_pd_version != region_shard_version { - return Err(format!( - "region {} version not match, pd: {}, shard: {}", - region_id, region_pd_version, region_shard_version - )); - } - } - Ok(()) - } - pub fn check_buckets( &self, pd_client: &dyn PdClientExt, bucket_size: u64, - ) -> Result<(), String> { - let regions = pd_client.get_all_regions(); - check_regions_boundary(&[], &[], true, ®ions) - .map_err(|e| format!("check_regions_boundary failed: {:?}", e))?; - for region in ®ions { + starts_from_encoded_key: &[u8], + ) -> Result<(), (String /* reason */, Option)> { + let regions = pd_client.get_all_regions(); // TODO: get regions from `starts_from_encoded_key`. + check_regions_boundary(starts_from_encoded_key, &[], true, ®ions) + .map_err(|e| (format!("check_regions_boundary failed: {:?}", e), None))?; + for region in regions { + if !region.end_key.is_empty() && region.end_key.as_slice() <= starts_from_encoded_key { + continue; + } + let region_id = region.get_id(); let region_shard_stats = self.get_region_shard_stats(region_id).ok_or_else(|| { - format!( - "region not found in cluster, region:{:?}, cluster:{:?}", - region, self.regions + ( + "region not found in cluster".to_owned(), + Some(region.clone()), ) })?; let shard_level_size: u64 = @@ -1008,6 +1007,19 @@ impl ClusterDataStats { if shard_level_size == 0 { continue; } + + let region_pd_version = region.get_region_epoch().get_version(); + let region_shard_version = region_shard_stats.ver; + if region_pd_version != region_shard_version { + return Err(( + format!( + "version not match, pd: {}, shard: {}", + region_pd_version, region_shard_version + ), + Some(region), + )); + } + if let Some(buckets) = pd_client.get_buckets(region_id) { for i in 1..buckets.meta.keys.len() { let prev_key = &buckets.meta.keys[i - 1]; @@ -1020,21 +1032,22 @@ impl ClusterDataStats { let actual_bucket_count = buckets.count() as u64; let ratio = expected_bucket_count as f64 / actual_bucket_count as f64; if !(0.3..=3.0).contains(&ratio) { - return Err(format!( - "region {} buckets {:?}, shard_level_size {}, expected {}, actual {}, region {:?}, shard stats {:?}", - region_id, - buckets, - shard_level_size, - expected_bucket_count, - actual_bucket_count, - region, - region_shard_stats, + return Err(( + format!( + "buckets {:?}, shard_level_size {}, expected {}, actual {}, shard stats {:?}", + buckets, + shard_level_size, + expected_bucket_count, + actual_bucket_count, + region_shard_stats, + ), + Some(region), )); }; } else { - return Err(format!( - "region {} no buckets, shard_level_size {}, region {:?}", - region_id, shard_level_size, region, + return Err(( + format!("no buckets, shard_level_size {}", shard_level_size), + Some(region), )); } } @@ -1058,7 +1071,7 @@ impl RegionShardStats { } } - fn check_consistency(&self) -> Result<(), String> { + fn check_consistency(&self) -> anyhow::Result<()> { if self.shard_stats.len() <= 1 { return Ok(()); } @@ -1074,7 +1087,7 @@ impl RegionShardStats { || stats.l0_table_count != first_stats.l0_table_count || stats.ver != first_stats.ver { - let err_msg = format!( + bail!( "inconsistent stats, first: {}:{}:{}: {:?}, current: {}:{}:{}: {:?}", first_id, first_stats.id, @@ -1085,37 +1098,30 @@ impl RegionShardStats { stats.ver, stats ); - return Err(err_msg); } } Ok(()) } - fn get_leader_stats(&self) -> Result<&ShardStats, String> { + fn get_leader_stats(&self) -> anyhow::Result<&ShardStats> { let item = self.shard_stats.values().find(|stats| stats.active); if item.is_none() { - return Err("no leader".into()); + bail!("no leader"); } Ok(item.unwrap()) } - fn check_healthy(&self) -> Result<(), String> { + fn check_healthy(&self) -> anyhow::Result<&ShardStats> { let stats = self.get_leader_stats()?; if stats.mem_table_count > 1 { - return Err(format!( - "mem table count {} too large", - stats.mem_table_count - )); + bail!("mem table count {} too large", stats.mem_table_count); } if !stats.flushed { - return Err("not initial flushed".into()); + bail!("not initial flushed"); } if stats.compaction_score > 2.0 { - return Err(format!( - "compaction score too large: {}", - stats.compaction_score - )); + bail!("compaction score too large: {}", stats.compaction_score); } - Ok(()) + Ok(stats) } } diff --git a/tests/random/mod.rs b/tests/random/mod.rs index e6565adb328..b1bda854f49 100644 --- a/tests/random/mod.rs +++ b/tests/random/mod.rs @@ -9,6 +9,7 @@ mod test_tidb; mod test_txn_file; use std::{ + collections::HashSet, str::FromStr, sync::{ atomic::{AtomicU16, AtomicUsize, Ordering}, @@ -23,27 +24,21 @@ use cloud_encryption::KeyspaceEncryptionConfig; use futures::executor::block_on; use http::{Request, StatusCode, Uri}; use hyper::Body; -use kvproto::{metapb::Store, pdpb::CheckPolicy}; +use kvproto::metapb::Store; use pd_client::PdClient; use rand::prelude::*; use security::SecurityConfig; use test_cloud_server::{ - client::{ClusterClient, ClusterTxnClient}, + client::ClusterTxnClient, keyspace::{ClusterKeyspaceClient, KeyspaceManager}, scheduler::Scheduler, tidb::TidbCluster, - try_wait, ServerCluster, + try_wait_result, ServerCluster, }; -use test_pd_client::{PdClientExt, TestPdClient}; +use test_pd_client::TestPdClient; use tikv::config::TikvConfig; use tikv_client::TimestampExt; -use tikv_util::{ - box_err, - config::{ReadableDuration, ReadableSize}, - error, info, - time::Instant, - warn, -}; +use tikv_util::{box_err, error, info, time::Instant, warn}; use tokio::sync::{OwnedRwLockWriteGuard, Semaphore}; use txn_types::Key; @@ -99,128 +94,6 @@ pub(crate) fn alloc_node_id_vec(count: usize) -> Vec { nodes } -#[test] -fn test_random_merge() { - test_util::init_log_for_test(); - // use 4 nodes for easier schedule merge. - let nodes = vec![ - alloc_node_id(), - alloc_node_id(), - alloc_node_id(), - alloc_node_id(), - ]; - let bucket_size_kb = 64; - let update_conf_fn = |_, conf: &mut TikvConfig| { - conf.coprocessor.region_split_size = ReadableSize::kb(192); - conf.coprocessor.region_bucket_size = ReadableSize::kb(bucket_size_kb); - conf.raft_store.peer_stale_state_check_interval = ReadableDuration::secs(1); - conf.raft_store.abnormal_leader_missing_duration = ReadableDuration::secs(3); - conf.raft_store.max_leader_missing_duration = ReadableDuration::secs(5); - conf.rocksdb.writecf.target_file_size_base = ReadableSize::kb(16); - conf.rocksdb.writecf.write_buffer_size = ReadableSize::kb(96); - conf.rfengine.target_file_size = ReadableSize::mb(1); - conf.rfengine.batch_compression_threshold = - ReadableSize::kb(rand::thread_rng().gen_range(0..2)); - conf.kvengine.compaction_tombs_count = 100; - }; - let mut cluster = ServerCluster::new(nodes.clone(), update_conf_fn); - cluster.wait_region_replicated(&[], 3); - cluster.get_pd_client().disable_default_operator(); - let region = cluster - .get_pd_client() - .get_all_regions() - .first() - .unwrap() - .clone(); - let mut keys = vec![]; - for i in 0..20 { - let key = i_to_key(i * 100); - keys.push(Key::from_raw(&key).into_encoded()); - } - cluster - .get_pd_client() - .must_split_region(region, CheckPolicy::Usekey, keys); - let move_scheduler = cluster.new_scheduler(); - for _ in 0..20 { - move_scheduler.move_random_region(); - } - let handles = vec![ - spawn_write(0, cluster.new_client()), - spawn_merge(cluster.new_scheduler(), true), - spawn_transfer(cluster.new_scheduler()), - spawn_move(cluster.new_scheduler(), Arc::new(RwLock::new(()))), - ]; - let start_time = Instant::now(); - let pd_client = cluster.get_pd_client(); - while start_time.saturating_elapsed() < TIMEOUT { - let ts = block_on(pd_client.get_tso()).unwrap(); - let mut rng = rand::thread_rng(); - let node_idx = rng.gen_range(0..nodes.len()); - let node_id = nodes[node_idx]; - info!("stop node {}", node_id); - cluster.stop_node(node_id); - info!("finish stop node {}", node_id); - let sleep_sec = rng.gen_range(1..5); - sleep(Duration::from_secs(sleep_sec)); - info!("start node {}", node_id); - cluster.start_node(node_id, update_conf_fn); - sleep(Duration::from_secs(10)); - let _ = pd_client.set_gc_safe_point(ts.into_inner()).unwrap(); - } - info!("stop node thread exit"); - for handle in handles { - handle.join().unwrap(); - } - let ok = try_wait( - || { - let data_stats = cluster.get_data_stats(); - data_stats.check_data().is_ok() - }, - 10, - ); - let data_stats = cluster.get_data_stats(); - if !ok { - data_stats.check_data().unwrap(); - } - - cluster.wait_region_version_match(); - data_stats - .check_buckets(pd_client.as_ref(), bucket_size_kb * 1024) - .unwrap(); - let mut client = cluster.new_client(); - client.verify_data_with_ref_store(); - cluster.stop(); - let total_write_count = WRITE_COUNTER.load(Ordering::SeqCst); - let total_merge_count = MERGE_COUNTER.load(Ordering::SeqCst); - let total_move_count = MOVE_COUNTER.load(Ordering::SeqCst); - let total_transfer_count = TRANSFER_COUNTER.load(Ordering::SeqCst); - let region_number = pd_client.get_regions_number(); - info!( - "TEST SUCCEED: total_write_count {}, region number {}, merge count {}, move count {}, transfer count {}", - total_write_count, region_number, total_merge_count, total_move_count, total_transfer_count, - ); -} - -pub(crate) fn spawn_write(idx: usize, mut client: ClusterClient) -> JoinHandle<()> { - std::thread::spawn(move || { - // Make sure each write thread don't conflict with others. - let begin = idx * 2000; - let end = begin + 2000 - 10; - let start_time = Instant::now(); - let mut rng = rand::thread_rng(); - while start_time.saturating_elapsed() < TIMEOUT { - let i = rng.gen_range(begin..end); - if rng.gen_ratio(2, 3) { - client.put_kv(i..(i + 10), i_to_key, i_to_val); - } else { - client.del_kv(i..(i + 10), i_to_key); - } - WRITE_COUNTER.fetch_add(10, Ordering::SeqCst); - } - info!("write thread {} exit", idx); - }) -} - pub(crate) fn spawn_move(scheduler: Scheduler, two_node_down: Arc>) -> JoinHandle<()> { std::thread::spawn(move || { let start_time = Instant::now(); @@ -622,13 +495,6 @@ pub(crate) fn i_to_key(i: usize) -> Vec { format!("xkey{:08}", i).into_bytes() } -pub(crate) fn i_to_val(i: usize) -> Vec { - let mut rng = rand::thread_rng(); - let mut buf = vec![0; i % 512 + 1]; - rng.fill_bytes(&mut buf); - buf -} - pub(crate) fn generate_keyspace_key(keyspace_id: u32) -> impl Fn(usize) -> Vec { move |i: usize| -> Vec { let mut key = ApiV2::get_txn_keyspace_prefix(keyspace_id); @@ -665,3 +531,48 @@ pub(crate) fn env_switch(env_key: &str) -> bool { .unwrap_or(1) > 0 } + +pub(crate) fn verify_cluster_stats(cluster: &ServerCluster, bucket_size: u64, timeout: Duration) { + let mut success_shards = HashSet::new(); + try_wait_result( + || { + let stats = cluster.get_data_stats_ext(Some(&success_shards)); + stats.check_data().map_err(|(errs, this_success_shards)| { + success_shards.extend(this_success_shards); + (errs, stats) + }) + }, + timeout.as_secs() as usize, + ) + .unwrap_or_else(|(err, stats)| { + stats.log_all(); + panic!("check_data failed: {:?}", err); + }); + + let mut starts_from_encoded_key = vec![]; + try_wait_result( + || { + let data_stats = cluster.get_data_stats(); + let res = data_stats.check_buckets( + cluster.get_pd_client_ext().as_ref(), + bucket_size, + &starts_from_encoded_key, + ); + if let Err((reason, region)) = &res { + warn!( + "check_buckets failed, region {:?}, reason {:?}", + region, reason + ); + if let Some(region) = region { + starts_from_encoded_key = region.start_key.clone(); + } + } + res.map_err(|err| (err, data_stats)) + }, + timeout.as_secs() as usize, + ) + .unwrap_or_else(|(err, stats)| { + stats.log_all(); + panic!("check_buckets failed: {:?}", err); + }); +} diff --git a/tests/random/test_all.rs b/tests/random/test_all.rs index 37df34f1ead..50e308d9f9d 100644 --- a/tests/random/test_all.rs +++ b/tests/random/test_all.rs @@ -19,8 +19,7 @@ use pd_client::PdClient; use rand::prelude::*; use security::SecurityConfig; use test_cloud_server::{ - client::ClusterClientOptions, oss::prepare_dfs, tidb::TidbCluster, try_wait_result, - ServerCluster, + client::ClusterClientOptions, oss::prepare_dfs, tidb::TidbCluster, ServerCluster, }; use test_pd_client::{PdClientExt, PdWrapper}; use tikv_util::{ @@ -446,21 +445,7 @@ async fn verify_cluster(cluster: &mut ServerCluster) -> usize /* records count i // Check statistics. // Check after verify data, to ensure that PD heartbeat have updated region // stats. - let data_stats = try_wait_result( - || { - let stats = cluster.get_data_stats(); - match stats.check_data() { - Ok(()) => Ok(stats), - Err(err) => Err((err, stats)), - } - }, - 20, - ) - .expect("check_data failed"); - cluster.wait_region_version_match(); - data_stats - .check_buckets(cluster.get_pd_client_ext().as_ref(), REGION_BUCKET_SIZE.0) - .expect("check_buckets failed"); + verify_cluster_stats(cluster, REGION_BUCKET_SIZE.0, Duration::from_secs(30)); check_br(); check_load_data(); diff --git a/tests/random/test_tidb.rs b/tests/random/test_tidb.rs index 2e8e725915a..454d5ec4b8d 100644 --- a/tests/random/test_tidb.rs +++ b/tests/random/test_tidb.rs @@ -14,9 +14,7 @@ use pd_client::{ }; use rand::prelude::*; use security::SecurityConfig; -use test_cloud_server::{ - oss::prepare_dfs, tidb::*, tpc::*, try_wait_async, try_wait_result, ServerCluster, -}; +use test_cloud_server::{oss::prepare_dfs, tidb::*, tpc::*, try_wait_async, ServerCluster}; use test_pd_client::PdWrapper; use tikv::config::TikvConfig; use tikv_util::{ @@ -481,32 +479,7 @@ async fn verify_cluster(cluster: &mut ServerCluster, tpc_switch_on: bool, jepsen // Check statistics. // Check after verify data, to ensure that PD heartbeat have updated region // stats. - try_wait_result( - || { - let stats = cluster.get_data_stats(); - stats.check_data().map_err(|err| (err, stats)) - }, - 20, - ) - .expect("check_data failed"); - cluster.wait_region_version_match(); - try_wait_result( - || { - // There are still region changes after all schedulers & operators stopped, as - // tikv-server can also initiate region splits. So during retry, get cluster - // stats again. - // TODO: retry from last failed region. - let data_stats = cluster.get_data_stats(); - let res = data_stats - .check_buckets(cluster.get_pd_client_ext().as_ref(), REGION_BUCKET_SIZE.0); - if res.is_err() { - warn!("check_buckets failed, err {:?}", res); - } - res.map_err(|err| (err, data_stats)) - }, - 10, - ) - .expect("check_buckets failed"); + verify_cluster_stats(cluster, REGION_BUCKET_SIZE.0, Duration::from_secs(60)); if tpc_switch_on { check_tpc();