Skip to content

Commit

Permalink
Get estimated split key
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Jun 5, 2024
1 parent 045bb1f commit 3a7bc1d
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 5 deletions.
90 changes: 90 additions & 0 deletions src/server/src/engine/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::time::{Duration, Instant};
use log::{info, warn};
use prost::Message;
use sekas_api::server::v1::*;
use sekas_rock::lexical;
use sekas_schema::shard;

use super::RawDb;
Expand Down Expand Up @@ -435,6 +436,30 @@ impl GroupEngine {
}
}

/// Estimate the split keys of the target shard.
pub fn estimate_split_key(&self, shard_id: u64) -> Result<Option<Vec<u8>>> {
let shard_desc = self.shard_desc(shard_id)?;
let RangePartition { start, end } = shard_desc.range.ok_or_else(|| {
Error::InvalidData(format!("the range field of shard {shard_id} is not set"))
})?;
let start = keys::raw(shard_desc.table_id, &start);
let end = if end.is_empty() {
lexical::lexical_next_boundary(&keys::raw(shard_desc.table_id, &end))
} else {
keys::raw(shard_desc.table_id, &end)
};

let estimated_split_keys =
self.raw_db.estimate_split_keys_in_range(&self.cf_handle(), &start, &end)?;
if estimated_split_keys.is_empty() {
return Ok(None);
}
let num_split_keys = estimated_split_keys.len();
let split_point = num_split_keys / 2;
Ok(Some(estimated_split_keys[split_point].clone()))
}

/// return the desc of the specified shard.
#[inline]
pub fn shard_desc(&self, shard_id: u64) -> Result<ShardDesc> {
self.core
Expand Down Expand Up @@ -1506,4 +1531,69 @@ mod tests {
assert_eq!(value_set.values, case, "idx = {idx}");
}
}

#[sekas_macro::test]
async fn estimate_split_key_of_all_range() {
let dir = TempDir::new(fn_name!()).unwrap();
let (group_id, shard_id) = (1, 1);
let engine = create_engine(group_id, shard_id, dir.path().join("1").as_path()).await;

let split_key = engine.estimate_split_key(shard_id).unwrap();
assert!(split_key.is_none());

let mut wb = WriteBatch::default();
let n = 5000;
for i in 0..n {
engine
.put(
&mut wb,
shard_id,
format!("key-{i:03}").as_bytes(),
format!("value-{i}").as_bytes(),
i,
)
.unwrap();
}
engine.commit(wb, WriteStates::default(), false).unwrap();
engine.raw_db.flush_cf(&engine.cf_handle()).unwrap();

let split_key = engine.estimate_split_key(shard_id).unwrap();
assert!(split_key.is_some());
}

#[sekas_macro::test]
async fn estimate_split_key_in_range() {
let dir = TempDir::new(fn_name!()).unwrap();
let (group_id, shard_id) = (1, 1);
let engine = create_engine_with_range(
group_id,
shard_id,
b"a".to_vec(),
b"b".to_vec(),
dir.path().join("1").as_path(),
)
.await;

let split_key = engine.estimate_split_key(shard_id).unwrap();
assert!(split_key.is_none());

let mut wb = WriteBatch::default();
let n = 5000;
for i in 0..n {
engine
.put(
&mut wb,
shard_id,
format!("a-key-{i:03}").as_bytes(),
format!("value-{i}").as_bytes(),
i,
)
.unwrap();
}
engine.commit(wb, WriteStates::default(), false).unwrap();
engine.raw_db.flush_cf(&engine.cf_handle()).unwrap();

let split_key = engine.estimate_split_key(shard_id).unwrap();
assert!(split_key.is_some());
}
}
81 changes: 80 additions & 1 deletion src/server/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod options;
mod properties;
mod state;

use std::collections::BTreeSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;

Expand All @@ -35,7 +36,7 @@ const LAYOUT_DATA: &str = "db";
const LAYOUT_LOG: &str = "log";
const LAYOUT_SNAP: &str = "snap";

type DbResult<T> = Result<T, rocksdb::Error>;
type DbResult<T, E = rocksdb::Error> = Result<T, E>;

pub(crate) struct RawDb {
pub options: rocksdb::Options,
Expand Down Expand Up @@ -110,6 +111,39 @@ impl RawDb {
) -> DbResult<()> {
self.db.ingest_external_file_cf_opts(cf, opts, paths)
}

/// Estimate the split keys in the target range.
#[inline]
pub fn estimate_split_keys_in_range(
&self,
cf: &impl rocksdb::AsColumnFamilyRef,
start: &[u8],
end: &[u8],
) -> DbResult<Vec<Vec<u8>>, crate::Error> {
use properties::{EstimatedSplitKeys, PROPERTY_SPLIT_KEYS};
use prost::Message;

let collection = if end.is_empty() {
self.db.get_properties_of_all_range(cf)?
} else {
self.db.get_properties_of_tables_in_range(cf, &[(start, end)])?

Check failure on line 129 in src/server/src/engine/mod.rs

View workflow job for this annotation

GitHub Actions / Run tests (ubuntu-latest)

type annotations needed

Check failure on line 129 in src/server/src/engine/mod.rs

View workflow job for this annotation

GitHub Actions / Run tests (macos-latest)

type annotations needed
};
let mut split_keys = BTreeSet::default();
for table in collection.tables {
let properties = table.user_collected_properties();
if let Some(value) = properties.get(PROPERTY_SPLIT_KEYS) {
let table_split_keys = EstimatedSplitKeys::decode(&**value).map_err(|err| {
crate::Error::InvalidData(format!("deserialize EstimatedSplitKeys: {err}"))
})?;
for key in table_split_keys.keys {
if start < key.as_slice() && (key.as_slice() < end || end.is_empty()) {
split_keys.insert(key);
}
}
}
}
Ok(split_keys.into_iter().collect::<Vec<_>>())
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -257,6 +291,51 @@ mod tests {
}
}

#[test]
fn estimate_split_keys() {
let dir = TempDir::new(fn_name!()).unwrap();
let db = open_raw_db(&DbConfig::default(), dir.path()).unwrap();
db.create_cf("cf1").unwrap();
let cf_handle = db.cf_handle("cf1").unwrap();
let mut wb = rocksdb::WriteBatch::default();
let n = 5000;
for i in 0..n {
wb.put_cf(
&cf_handle,
format!("key-{i:03}").as_bytes(),
format!("value-{i}").as_bytes(),
);
}
let mut opt = rocksdb::WriteOptions::default();
opt.set_sync(false);
db.write_opt(wb, &opt).unwrap();
db.flush_cf(&cf_handle).unwrap();

// A sub range.
let split_keys = db
.estimate_split_keys_in_range(
&cf_handle,
format!("key-{:03}", 0).as_bytes(),
format!("key-{:03}", n - 100).as_bytes(),
)
.unwrap();
assert!(!split_keys.is_empty());

// A inf range
let split_keys = db.estimate_split_keys_in_range(&cf_handle, &[], &[]).unwrap();
assert!(!split_keys.is_empty());

// An empty range
let split_keys = db
.estimate_split_keys_in_range(
&cf_handle,
format!("key").as_bytes(),
format!("key-0000").as_bytes(),
)
.unwrap();
assert!(split_keys.is_empty());
}

#[test]
fn reopen_raft_engine() {
let dir = TempDir::new(fn_name!()).unwrap();
Expand Down
9 changes: 5 additions & 4 deletions src/server/src/engine/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use prost::Message;
use rocksdb::table_properties::*;
use serde::{Deserialize, Serialize};

const PROPERTY_SPLIT_KEYS: &[u8] = b"sekas-split-keys";
pub const PROPERTY_SPLIT_KEYS: &[u8] = b"sekas-split-keys";
const ESTIMATE_KEYS_INTERVALS: usize = 1024;
const ESTIMATE_SIZE_INTERVALS: usize = 16 << 20; // 16MB

Expand Down Expand Up @@ -75,9 +75,10 @@ impl TablePropertiesCollector for SplitKeyCollector {

self.num_keys += 1;
self.total_size += key.len() + value.len();
if self.last_estimate_keys + ESTIMATE_KEYS_INTERVALS >= self.num_keys
|| self.last_estimate_size + ESTIMATE_SIZE_INTERVALS >= self.total_size
if self.last_estimate_keys + ESTIMATE_KEYS_INTERVALS <= self.num_keys
|| self.last_estimate_size + ESTIMATE_SIZE_INTERVALS <= self.total_size
{
println!("collect key {key:?}");
self.last_estimate_keys = self.num_keys;
self.last_estimate_size = self.total_size;
self.keys.push(key.to_owned());
Expand All @@ -97,5 +98,5 @@ impl TablePropertiesCollector for SplitKeyCollector {
#[derive(Serialize, Deserialize, prost::Message)]
pub(crate) struct EstimatedSplitKeys {
#[prost(bytes = "vec", repeated, tag = "1")]
keys: Vec<Vec<u8>>,
pub keys: Vec<Vec<u8>>,
}

0 comments on commit 3a7bc1d

Please sign in to comment.