diff --git a/src/server/src/engine/group.rs b/src/server/src/engine/group.rs index f4baf67..39dbae4 100644 --- a/src/server/src/engine/group.rs +++ b/src/server/src/engine/group.rs @@ -22,6 +22,7 @@ use std::time::{Duration, Instant}; use log::{info, warn}; use prost::Message; use sekas_api::server::v1::*; +use sekas_rock::lexical; use sekas_schema::shard; use super::RawDb; @@ -435,6 +436,30 @@ impl GroupEngine { } } + /// Estimate the split keys of the target shard. + pub fn estimate_split_key(&self, shard_id: u64) -> Result>> { + let shard_desc = self.shard_desc(shard_id)?; + let RangePartition { start, end } = shard_desc.range.ok_or_else(|| { + Error::InvalidData(format!("the range field of shard {shard_id} is not set")) + })?; + let start = keys::raw(shard_desc.table_id, &start); + let end = if end.is_empty() { + lexical::lexical_next_boundary(&keys::raw(shard_desc.table_id, &end)) + } else { + keys::raw(shard_desc.table_id, &end) + }; + + let estimated_split_keys = + self.raw_db.estimate_split_keys_in_range(&self.cf_handle(), &start, &end)?; + if estimated_split_keys.is_empty() { + return Ok(None); + } + let num_split_keys = estimated_split_keys.len(); + let split_point = num_split_keys / 2; + Ok(Some(estimated_split_keys[split_point].clone())) + } + + /// return the desc of the specified shard. #[inline] pub fn shard_desc(&self, shard_id: u64) -> Result { self.core @@ -1506,4 +1531,69 @@ mod tests { assert_eq!(value_set.values, case, "idx = {idx}"); } } + + #[sekas_macro::test] + async fn estimate_split_key_of_all_range() { + let dir = TempDir::new(fn_name!()).unwrap(); + let (group_id, shard_id) = (1, 1); + let engine = create_engine(group_id, shard_id, dir.path().join("1").as_path()).await; + + let split_key = engine.estimate_split_key(shard_id).unwrap(); + assert!(split_key.is_none()); + + let mut wb = WriteBatch::default(); + let n = 5000; + for i in 0..n { + engine + .put( + &mut wb, + shard_id, + format!("key-{i:03}").as_bytes(), + format!("value-{i}").as_bytes(), + i, + ) + .unwrap(); + } + engine.commit(wb, WriteStates::default(), false).unwrap(); + engine.raw_db.flush_cf(&engine.cf_handle()).unwrap(); + + let split_key = engine.estimate_split_key(shard_id).unwrap(); + assert!(split_key.is_some()); + } + + #[sekas_macro::test] + async fn estimate_split_key_in_range() { + let dir = TempDir::new(fn_name!()).unwrap(); + let (group_id, shard_id) = (1, 1); + let engine = create_engine_with_range( + group_id, + shard_id, + b"a".to_vec(), + b"b".to_vec(), + dir.path().join("1").as_path(), + ) + .await; + + let split_key = engine.estimate_split_key(shard_id).unwrap(); + assert!(split_key.is_none()); + + let mut wb = WriteBatch::default(); + let n = 5000; + for i in 0..n { + engine + .put( + &mut wb, + shard_id, + format!("a-key-{i:03}").as_bytes(), + format!("value-{i}").as_bytes(), + i, + ) + .unwrap(); + } + engine.commit(wb, WriteStates::default(), false).unwrap(); + engine.raw_db.flush_cf(&engine.cf_handle()).unwrap(); + + let split_key = engine.estimate_split_key(shard_id).unwrap(); + assert!(split_key.is_some()); + } } diff --git a/src/server/src/engine/mod.rs b/src/server/src/engine/mod.rs index 6d19e16..a6d2e26 100644 --- a/src/server/src/engine/mod.rs +++ b/src/server/src/engine/mod.rs @@ -17,6 +17,7 @@ mod options; mod properties; mod state; +use std::collections::BTreeSet; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -35,7 +36,7 @@ const LAYOUT_DATA: &str = "db"; const LAYOUT_LOG: &str = "log"; const LAYOUT_SNAP: &str = "snap"; -type DbResult = Result; +type DbResult = Result; pub(crate) struct RawDb { pub options: rocksdb::Options, @@ -110,6 +111,39 @@ impl RawDb { ) -> DbResult<()> { self.db.ingest_external_file_cf_opts(cf, opts, paths) } + + /// Estimate the split keys in the target range. + #[inline] + pub fn estimate_split_keys_in_range( + &self, + cf: &impl rocksdb::AsColumnFamilyRef, + start: &[u8], + end: &[u8], + ) -> DbResult>, crate::Error> { + use properties::{EstimatedSplitKeys, PROPERTY_SPLIT_KEYS}; + use prost::Message; + + let collection = if end.is_empty() { + self.db.get_properties_of_all_range(cf)? + } else { + self.db.get_properties_of_tables_in_range(cf, &[(start, end)])? + }; + let mut split_keys = BTreeSet::default(); + for table in collection.tables { + let properties = table.user_collected_properties(); + if let Some(value) = properties.get(PROPERTY_SPLIT_KEYS) { + let table_split_keys = EstimatedSplitKeys::decode(&**value).map_err(|err| { + crate::Error::InvalidData(format!("deserialize EstimatedSplitKeys: {err}")) + })?; + for key in table_split_keys.keys { + if start < key.as_slice() && (key.as_slice() < end || end.is_empty()) { + split_keys.insert(key); + } + } + } + } + Ok(split_keys.into_iter().collect::>()) + } } #[derive(Clone)] @@ -257,6 +291,51 @@ mod tests { } } + #[test] + fn estimate_split_keys() { + let dir = TempDir::new(fn_name!()).unwrap(); + let db = open_raw_db(&DbConfig::default(), dir.path()).unwrap(); + db.create_cf("cf1").unwrap(); + let cf_handle = db.cf_handle("cf1").unwrap(); + let mut wb = rocksdb::WriteBatch::default(); + let n = 5000; + for i in 0..n { + wb.put_cf( + &cf_handle, + format!("key-{i:03}").as_bytes(), + format!("value-{i}").as_bytes(), + ); + } + let mut opt = rocksdb::WriteOptions::default(); + opt.set_sync(false); + db.write_opt(wb, &opt).unwrap(); + db.flush_cf(&cf_handle).unwrap(); + + // A sub range. + let split_keys = db + .estimate_split_keys_in_range( + &cf_handle, + format!("key-{:03}", 0).as_bytes(), + format!("key-{:03}", n - 100).as_bytes(), + ) + .unwrap(); + assert!(!split_keys.is_empty()); + + // A inf range + let split_keys = db.estimate_split_keys_in_range(&cf_handle, &[], &[]).unwrap(); + assert!(!split_keys.is_empty()); + + // An empty range + let split_keys = db + .estimate_split_keys_in_range( + &cf_handle, + format!("key").as_bytes(), + format!("key-0000").as_bytes(), + ) + .unwrap(); + assert!(split_keys.is_empty()); + } + #[test] fn reopen_raft_engine() { let dir = TempDir::new(fn_name!()).unwrap(); diff --git a/src/server/src/engine/properties.rs b/src/server/src/engine/properties.rs index 01763e3..ffb762f 100644 --- a/src/server/src/engine/properties.rs +++ b/src/server/src/engine/properties.rs @@ -19,7 +19,7 @@ use prost::Message; use rocksdb::table_properties::*; use serde::{Deserialize, Serialize}; -const PROPERTY_SPLIT_KEYS: &[u8] = b"sekas-split-keys"; +pub const PROPERTY_SPLIT_KEYS: &[u8] = b"sekas-split-keys"; const ESTIMATE_KEYS_INTERVALS: usize = 1024; const ESTIMATE_SIZE_INTERVALS: usize = 16 << 20; // 16MB @@ -75,9 +75,10 @@ impl TablePropertiesCollector for SplitKeyCollector { self.num_keys += 1; self.total_size += key.len() + value.len(); - if self.last_estimate_keys + ESTIMATE_KEYS_INTERVALS >= self.num_keys - || self.last_estimate_size + ESTIMATE_SIZE_INTERVALS >= self.total_size + if self.last_estimate_keys + ESTIMATE_KEYS_INTERVALS <= self.num_keys + || self.last_estimate_size + ESTIMATE_SIZE_INTERVALS <= self.total_size { + println!("collect key {key:?}"); self.last_estimate_keys = self.num_keys; self.last_estimate_size = self.total_size; self.keys.push(key.to_owned()); @@ -97,5 +98,5 @@ impl TablePropertiesCollector for SplitKeyCollector { #[derive(Serialize, Deserialize, prost::Message)] pub(crate) struct EstimatedSplitKeys { #[prost(bytes = "vec", repeated, tag = "1")] - keys: Vec>, + pub keys: Vec>, }