Skip to content

Commit

Permalink
random-test: Improve stability of check cluster (tikv#1747)
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Jul 30, 2024
1 parent 869b465 commit ab817b8
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 276 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions components/kvengine/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::cmp;
use std::{cmp, collections::HashSet};

use bytes::Bytes;

Expand Down Expand Up @@ -62,9 +62,18 @@ impl EngineStats {

impl super::Engine {
pub fn get_all_shard_stats(&self) -> Vec<ShardStats> {
self.get_all_shard_stats_ext(None)
}

pub fn get_all_shard_stats_ext(&self, skip_shards: Option<&HashSet<IdVer>>) -> Vec<ShardStats> {
self.get_all_shard_id_vers()
.into_iter()
.filter_map(|id_ver| self.get_shard(id_ver.id).map(|shard| shard.get_stats()))
.filter_map(|id_ver| {
if skip_shards.is_some_and(|skip_shards| skip_shards.contains(&id_ver)) {
return None;
}
self.get_shard(id_ver.id).map(|shard| shard.get_stats())
})
.collect()
}

Expand Down
1 change: 1 addition & 0 deletions components/test_cloud_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
api_version = { path = "../api_version" }
bstr = "0.2.17"
bytes = "1"
Expand Down
182 changes: 94 additions & 88 deletions components/test_cloud_server/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
path::{Path, PathBuf},
sync::{
atomic::{AtomicU16, Ordering::Relaxed},
Expand All @@ -11,6 +11,7 @@ use std::{
time::Duration,
};

use anyhow::bail;
use cloud_server::TikvServer;
use cloud_worker::CloudWorker;
use dashmap::DashMap;
Expand All @@ -19,6 +20,7 @@ use grpcio::{Channel, ChannelBuilder, EnvBuilder, Environment};
use kvengine::{dfs::Dfs, ShardStats};
use kvproto::{
kvrpcpb::{Mutation, Op},
metapb,
metapb::PeerRole,
raft_cmdpb::{RaftCmdRequest, RaftCmdResponse, RaftRequestHeader},
};
Expand Down Expand Up @@ -501,11 +503,18 @@ impl ServerCluster {
}

pub fn get_data_stats(&self) -> ClusterDataStats {
self.get_data_stats_ext(None)
}

pub fn get_data_stats_ext(
&self,
skip_shards: Option<&HashSet<kvengine::IdVer>>,
) -> ClusterDataStats {
let mut stats = ClusterDataStats::default();
for server in self.servers.values() {
let store_id = server.get_store_id();
let kv_engine = server.get_kv_engine();
stats.add(store_id, kv_engine.get_all_shard_stats());
stats.add(store_id, kv_engine.get_all_shard_stats_ext(skip_shards));
}
stats
}
Expand Down Expand Up @@ -535,24 +544,6 @@ impl ServerCluster {
.collect()
}

// Wait shard version match between PD & kvengine.
pub fn wait_region_version_match(&self) {
let pd_client = self.pd_client.as_ref();
let ok = try_wait(
|| {
let data_stats = self.get_data_stats();
data_stats.check_region_version_match(pd_client).is_ok()
},
10,
);
if !ok {
let data_stats = self.get_data_stats();
data_stats
.check_region_version_match(pd_client)
.expect("check_region_version_match failed");
}
}

pub fn pd_endpoints(&self) -> &[String] {
self.pd.endpoints().unwrap()
}
Expand Down Expand Up @@ -926,13 +917,37 @@ impl ClusterDataStats {
}
}

pub fn check_data(&self) -> Result<(), String> {
pub fn check_data(
&self,
) -> Result<
(),
(
Vec<String>, // error reasons
HashSet<kvengine::IdVer>, // success_shards
),
> {
let mut success_shards = HashSet::with_capacity(self.regions.len());
let mut errs = vec![];
for stats in self.regions.values() {
let map_err_fn = |e| format!("err {} stats: {:?}", e, stats);
stats.check_consistency().map_err(map_err_fn)?;
stats.check_healthy().map_err(map_err_fn)?;
if let Err(err) = stats.check_consistency() {
errs.push(map_err_fn(err));
continue;
}
let leader = match stats.check_healthy() {
Ok(leader) => leader,
Err(err) => {
errs.push(map_err_fn(err));
continue;
}
};
success_shards.insert(kvengine::IdVer::new(leader.id, leader.ver));
}
if errs.is_empty() {
Ok(())
} else {
Err((errs, success_shards))
}
Ok(())
}

pub fn check_leader(&self) -> Result<(), String> {
Expand All @@ -953,61 +968,58 @@ impl ClusterDataStats {
}
}

pub fn log_all(&self) {
self.iter_shard_stats(|store_id, shard_stats| {
info!("shard_stats: {}:{:?}", store_id, shard_stats);
false
});
}

fn get_region_shard_stats(&self, region_id: u64) -> Option<&ShardStats> {
self.regions
.get(&region_id)
.map(|stats| stats.shard_stats.values().next().unwrap())
}

pub fn check_region_version_match(&self, pd_client: &dyn PdClientExt) -> Result<(), String> {
let regions = pd_client.get_all_regions();
check_regions_boundary(&[], &[], true, &regions)
.map_err(|e| format!("check_regions_boundary failed: {:?}", e))?;
for region in &regions {
let region_id = region.get_id();
let region_shard_stats = self.get_region_shard_stats(region_id).ok_or_else(|| {
format!(
"region not found in cluster, region:{:?}, cluster:{:?}",
region, self.regions
)
})?;
if region_shard_stats.total_size == 0 {
continue;
}

let region_pd_version = region.get_region_epoch().get_version();
let region_shard_version = region_shard_stats.ver;
if region_pd_version != region_shard_version {
return Err(format!(
"region {} version not match, pd: {}, shard: {}",
region_id, region_pd_version, region_shard_version
));
}
}
Ok(())
}

pub fn check_buckets(
&self,
pd_client: &dyn PdClientExt,
bucket_size: u64,
) -> Result<(), String> {
let regions = pd_client.get_all_regions();
check_regions_boundary(&[], &[], true, &regions)
.map_err(|e| format!("check_regions_boundary failed: {:?}", e))?;
for region in &regions {
starts_from_encoded_key: &[u8],
) -> Result<(), (String /* reason */, Option<metapb::Region>)> {
let regions = pd_client.get_all_regions(); // TODO: get regions from `starts_from_encoded_key`.
check_regions_boundary(starts_from_encoded_key, &[], true, &regions)
.map_err(|e| (format!("check_regions_boundary failed: {:?}", e), None))?;
for region in regions {
if !region.end_key.is_empty() && region.end_key.as_slice() <= starts_from_encoded_key {
continue;
}

let region_id = region.get_id();
let region_shard_stats = self.get_region_shard_stats(region_id).ok_or_else(|| {
format!(
"region not found in cluster, region:{:?}, cluster:{:?}",
region, self.regions
(
"region not found in cluster".to_owned(),
Some(region.clone()),
)
})?;
let shard_level_size: u64 =
region_shard_stats.total_size - region_shard_stats.mem_table_size;
if shard_level_size == 0 {
continue;
}

let region_pd_version = region.get_region_epoch().get_version();
let region_shard_version = region_shard_stats.ver;
if region_pd_version != region_shard_version {
return Err((
format!(
"version not match, pd: {}, shard: {}",
region_pd_version, region_shard_version
),
Some(region),
));
}

if let Some(buckets) = pd_client.get_buckets(region_id) {
for i in 1..buckets.meta.keys.len() {
let prev_key = &buckets.meta.keys[i - 1];
Expand All @@ -1020,21 +1032,22 @@ impl ClusterDataStats {
let actual_bucket_count = buckets.count() as u64;
let ratio = expected_bucket_count as f64 / actual_bucket_count as f64;
if !(0.3..=3.0).contains(&ratio) {
return Err(format!(
"region {} buckets {:?}, shard_level_size {}, expected {}, actual {}, region {:?}, shard stats {:?}",
region_id,
buckets,
shard_level_size,
expected_bucket_count,
actual_bucket_count,
region,
region_shard_stats,
return Err((
format!(
"buckets {:?}, shard_level_size {}, expected {}, actual {}, shard stats {:?}",
buckets,
shard_level_size,
expected_bucket_count,
actual_bucket_count,
region_shard_stats,
),
Some(region),
));
};
} else {
return Err(format!(
"region {} no buckets, shard_level_size {}, region {:?}",
region_id, shard_level_size, region,
return Err((
format!("no buckets, shard_level_size {}", shard_level_size),
Some(region),
));
}
}
Expand All @@ -1058,7 +1071,7 @@ impl RegionShardStats {
}
}

fn check_consistency(&self) -> Result<(), String> {
fn check_consistency(&self) -> anyhow::Result<()> {
if self.shard_stats.len() <= 1 {
return Ok(());
}
Expand All @@ -1074,7 +1087,7 @@ impl RegionShardStats {
|| stats.l0_table_count != first_stats.l0_table_count
|| stats.ver != first_stats.ver
{
let err_msg = format!(
bail!(
"inconsistent stats, first: {}:{}:{}: {:?}, current: {}:{}:{}: {:?}",
first_id,
first_stats.id,
Expand All @@ -1085,37 +1098,30 @@ impl RegionShardStats {
stats.ver,
stats
);
return Err(err_msg);
}
}
Ok(())
}

fn get_leader_stats(&self) -> Result<&ShardStats, String> {
fn get_leader_stats(&self) -> anyhow::Result<&ShardStats> {
let item = self.shard_stats.values().find(|stats| stats.active);
if item.is_none() {
return Err("no leader".into());
bail!("no leader");
}
Ok(item.unwrap())
}

fn check_healthy(&self) -> Result<(), String> {
fn check_healthy(&self) -> anyhow::Result<&ShardStats> {
let stats = self.get_leader_stats()?;
if stats.mem_table_count > 1 {
return Err(format!(
"mem table count {} too large",
stats.mem_table_count
));
bail!("mem table count {} too large", stats.mem_table_count);
}
if !stats.flushed {
return Err("not initial flushed".into());
bail!("not initial flushed");
}
if stats.compaction_score > 2.0 {
return Err(format!(
"compaction score too large: {}",
stats.compaction_score
));
bail!("compaction score too large: {}", stats.compaction_score);
}
Ok(())
Ok(stats)
}
}
Loading

0 comments on commit ab817b8

Please sign in to comment.