From 6132bc6d0e525417bad6380a120aa76cf6ac678c Mon Sep 17 00:00:00 2001 From: w41ter Date: Fri, 7 Jun 2024 20:48:26 +0800 Subject: [PATCH] Add cmd merge/split shard test --- src/server/src/engine/mod.rs | 3 +- src/server/src/node/mod.rs | 194 +++++++++++++- src/server/src/raftgroup/storage.rs | 243 ++++++++++++------ .../src/replica/eval/cmd_merge_shard.rs | 5 + .../src/replica/eval/cmd_split_shard.rs | 9 + src/server/src/replica/fsm/mod.rs | 14 +- 6 files changed, 378 insertions(+), 90 deletions(-) diff --git a/src/server/src/engine/mod.rs b/src/server/src/engine/mod.rs index d884097..04119c8 100644 --- a/src/server/src/engine/mod.rs +++ b/src/server/src/engine/mod.rs @@ -219,6 +219,7 @@ pub(crate) fn open_raft_engine(log_path: &Path) -> Result { use raft_engine::{Config, Engine}; let engine_dir = log_path.join("engine"); let snap_dir = log_path.join("snap"); + info!("open raft engine {}", engine_dir.display()); create_dir_all_if_not_exists(&engine_dir)?; create_dir_all_if_not_exists(&snap_dir)?; let engine_cfg = Config { @@ -339,7 +340,7 @@ mod tests { { let engine = open_raft_engine(dir.path()).unwrap(); let mut batch = LogBatch::default(); - batch.put(1, vec![1, 2, 3], vec![4, 5, 6]); + batch.put(1, vec![1, 2, 3], vec![4, 5, 6]).unwrap(); engine.write(&mut batch, true).unwrap(); } diff --git a/src/server/src/node/mod.rs b/src/server/src/node/mod.rs index a6db806..cd4ed51 100644 --- a/src/server/src/node/mod.rs +++ b/src/server/src/node/mod.rs @@ -736,6 +736,7 @@ mod tests { use super::*; use crate::constants::INITIAL_EPOCH; + use crate::RaftConfig; const TABLE_ID: u64 = 1; const NODE_ID: u64 = 2; @@ -745,7 +746,11 @@ mod tests { async fn create_node>(root_dir: P) -> Node { let root_dir = root_dir.as_ref().to_owned(); - let config = Config { root_dir, ..Default::default() }; + let config = Config { + root_dir, + raft: RaftConfig { tick_interval_ms: 10, ..Default::default() }, + ..Default::default() + }; let engines = Engines::open(&config.root_dir, &config.db).unwrap(); let transport_manager = TransportManager::new(vec![], engines.state()).await; @@ -765,6 +770,11 @@ mod tests { node.replica_table().find(GROUP_ID).unwrap() } + async fn create_first_replica_with_desc(node: &Node, group_desc: GroupDesc) -> Arc { + node.create_replica(REPLICA_ID, group_desc).await.unwrap(); + node.replica_table().find(GROUP_ID).unwrap() + } + async fn get_replica_state(node: Node, replica_id: u64) -> Option { node.state_engine() .replica_states() @@ -931,9 +941,16 @@ mod tests { } async fn execute_on_leader(replica: &Replica, request: Request) -> Response { + execute_on_leader_with_result(replica, request).await.unwrap() + } + + async fn execute_on_leader_with_result( + replica: &Replica, + request: Request, + ) -> Result { assert!(replica.on_leader(fn_name!(), false).await.unwrap().is_some()); let mut exec_ctx = ExecCtx::with_epoch(replica.epoch()); - replica.execute(&mut exec_ctx, &request).await.unwrap() + replica.execute(&mut exec_ctx, &request).await } async fn execute_read_request(replica: &Replica, key: &[u8]) -> Option { @@ -1193,4 +1210,177 @@ mod tests { matches!(value, Some(v) if v.content.as_ref().unwrap() == b"456" && v.version == second_commit_version) ); } + + #[sekas_macro::test] + async fn execute_split_shard_request() { + struct Test { + origin_shards: Vec, + split_shard: SplitShard, + expect_shards: Option>, + } + + let table_id = 1; + let tests = vec![ + // No such left shard exists. + Test { + origin_shards: vec![], + split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![] }, + expect_shards: None, + }, + // split key out of range. + Test { + origin_shards: vec![ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b'])], + split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![b'c'] }, + expect_shards: None, + }, + Test { + origin_shards: vec![ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b'])], + split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![b'b'] }, + expect_shards: None, + }, + // Split into two shards + Test { + origin_shards: vec![ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'c'])], + split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![b'b'] }, + expect_shards: Some(vec![ + ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']), + ShardDesc::with_range(1, table_id, vec![b'b'], vec![b'c']), + ]), + }, + Test { + origin_shards: vec![ + ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'c']), + ShardDesc::with_range(2, table_id, vec![b'c'], vec![b'd']), + ], + split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![b'b'] }, + expect_shards: Some(vec![ + ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']), + ShardDesc::with_range(1, table_id, vec![b'b'], vec![b'c']), + ShardDesc::with_range(2, table_id, vec![b'c'], vec![b'd']), + ]), + }, + ]; + for (i, test) in tests.into_iter().enumerate() { + let dir = TempDir::new(fn_name!()).unwrap(); + let node = bootstrap_node(dir.path()).await; + info!("origin shards: {:?}", test.origin_shards); + let desc = GroupDesc { shards: test.origin_shards, ..group_descriptor() }; + info!("desc: {desc:?}"); + let replica = create_first_replica_with_desc(&node, desc.clone()).await; + + let request = Request::SplitShard(SplitShardRequest { + old_shard_id: test.split_shard.old_shard_id, + new_shard_id: test.split_shard.new_shard_id, + split_key: Some(test.split_shard.split_key), + }); + info!("before execute, descriptor: {:?}", replica.descriptor()); + let result = execute_on_leader_with_result(&replica, request).await; + if let Some(expect_shards) = test.expect_shards { + assert!( + matches!(result, Ok(Response::SplitShard(_))), + "{i} got result: {result:?}, desc: {:?}", + replica.descriptor() + ); + let desc = replica.descriptor(); + for expect_shard in expect_shards { + let got_shard = + desc.shard(expect_shard.id).expect("The expect shard not exists"); + assert_eq!(*got_shard, expect_shard); + } + } else { + assert!(result.is_err()); + } + } + } + + #[sekas_macro::test] + async fn execute_merge_shard_request() { + struct Test { + origin_shards: Vec, + merge_shard: MergeShard, + expect_shards: Option>, + } + + let table_id = 1; + let tests = vec![ + // Shard not found. + Test { + origin_shards: vec![ShardDesc::with_range(0, table_id, vec![], vec![])], + merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 }, + expect_shards: None, + }, + Test { + origin_shards: vec![ShardDesc::with_range(1, table_id, vec![], vec![])], + merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 }, + expect_shards: None, + }, + // Table is not equals. + Test { + origin_shards: vec![ + ShardDesc::with_range(0, table_id, vec![], vec![]), + ShardDesc::with_range(1, table_id + 1, vec![], vec![]), + ], + merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 }, + expect_shards: None, + }, + // Range is not close to. + Test { + origin_shards: vec![ + ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']), + ShardDesc::with_range(1, table_id, vec![b'c'], vec![b'd']), + ], + merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 }, + expect_shards: None, + }, + // Merge two shards. + Test { + origin_shards: vec![ + ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']), + ShardDesc::with_range(1, table_id, vec![b'b'], vec![b'c']), + ], + merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 }, + expect_shards: Some(vec![ShardDesc::with_range( + 0, + table_id, + vec![b'a'], + vec![b'c'], + )]), + }, + Test { + origin_shards: vec![ + ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']), + ShardDesc::with_range(1, table_id, vec![b'b'], vec![b'c']), + ShardDesc::with_range(2, table_id, vec![b'c'], vec![b'd']), + ], + merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 }, + expect_shards: Some(vec![ + ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'c']), + ShardDesc::with_range(2, table_id, vec![b'c'], vec![b'd']), + ]), + }, + ]; + for test in tests { + let dir = TempDir::new(fn_name!()).unwrap(); + let node = bootstrap_node(dir.path()).await; + let desc = GroupDesc { shards: test.origin_shards, ..group_descriptor() }; + let replica = create_first_replica_with_desc(&node, desc.clone()).await; + + let request = Request::MergeShard(MergeShardRequest { + left_shard_id: test.merge_shard.left_shard_id, + right_shard_id: test.merge_shard.right_shard_id, + }); + let result = execute_on_leader_with_result(&replica, request).await; + if let Some(expect_shards) = test.expect_shards { + assert!(matches!(result, Ok(Response::MergeShard(_))), "got result: {result:?}"); + let desc = replica.descriptor(); + for expect_shard in expect_shards { + let got_shard = + desc.shard(expect_shard.id).expect("The expect shard not exists"); + assert_eq!(*got_shard, expect_shard); + } + } else { + assert!(result.is_err()); + } + } + } } diff --git a/src/server/src/raftgroup/storage.rs b/src/server/src/raftgroup/storage.rs index 226131c..6ae77a0 100644 --- a/src/server/src/raftgroup/storage.rs +++ b/src/server/src/raftgroup/storage.rs @@ -570,10 +570,11 @@ mod tests { use log::info; use raft_engine::Config; - use sekas_runtime::*; + use sekas_rock::fn_name; use tempdir::TempDir; use super::*; + use crate::serverpb::v1::{AddShard, SyncOp}; fn mocked_entries(select_term: Option) -> Vec<(u64, u64)> { let entries = vec![ @@ -852,94 +853,85 @@ mod tests { ); } - #[test] - fn raft_storage_basic() { - let owner = ExecutorOwner::new(1); - owner.executor().block_on(async move { - raft_storage_inner().await; - }); + #[sekas_macro::test] + async fn raft_storage_basic() { + raft_storage_inner().await; } - #[test] - fn raft_storage_snapshot() { - let owner = ExecutorOwner::new(1); - owner.executor().block_on(async move { - raft_storage_apply_snapshot().await; - open_empty_raft_storage_after_applying_snapshot().await; - }); + #[sekas_macro::test] + async fn raft_storage_snapshot() { + raft_storage_apply_snapshot().await; + open_empty_raft_storage_after_applying_snapshot().await; } - #[test] - fn fetch_entries_from_both_engine_and_cache_should_be_continuously() { - let owner = ExecutorOwner::new(1); - owner.executor().block_on(async move { - let dir = TempDir::new("raft-storage").unwrap(); + #[sekas_macro::test] + async fn fetch_entries_from_both_engine_and_cache_should_be_continuously() { + let dir = TempDir::new(fn_name!()).unwrap(); - let cfg = Config { - dir: dir.path().join("db").to_str().unwrap().to_owned(), - ..Default::default() - }; - let engine = Arc::new(Engine::open(cfg).unwrap()); - - write_initial_state(&RaftConfig::default(), engine.as_ref(), 1, vec![], vec![]) - .await - .unwrap(); + let cfg = Config { + dir: dir.path().join("db").to_str().unwrap().to_owned(), + ..Default::default() + }; + let engine = Arc::new(Engine::open(cfg).unwrap()); - let snap_mgr = SnapManager::new(dir.path().join("snap")); - let mut storage = Storage::open( - &RaftConfig::default(), - 1, - 0, - ConfState::default(), - engine.clone(), - snap_mgr, - ) + write_initial_state(&RaftConfig::default(), engine.as_ref(), 1, vec![], vec![]) .await .unwrap(); - insert_entries(engine.clone(), &mut storage, mocked_entries(None)).await; - validate_term(&storage, mocked_entries(None)); - validate_entries(&storage, mocked_entries(None)); - let first_index = mocked_entries(None).first().unwrap().0; - let last_index = mocked_entries(None).last().unwrap().0; - validate_range(&storage, first_index, last_index); - - // 1. apply all entries in term 2. - storage.post_apply(mocked_entries(Some(2)).last().unwrap().0); - validate_term(&storage, mocked_entries(None)); - validate_entries(&storage, mocked_entries(None)); - validate_range(&storage, first_index, last_index); - - pub fn assert_no_missing_entries(entries: &[Entry]) { - let first_index = entries.first().map(|v| v.index); - let last_index = entries.last().map(|v| v.index); - if let (Some(first_index), Some(last_index)) = (first_index, last_index) { - let size = (last_index - first_index) as usize; - if entries.len() <= size { - panic!("some entries are missing"); - } + + let snap_mgr = SnapManager::new(dir.path().join("snap")); + let mut storage = Storage::open( + &RaftConfig::default(), + 1, + 0, + ConfState::default(), + engine.clone(), + snap_mgr, + ) + .await + .unwrap(); + insert_entries(engine.clone(), &mut storage, mocked_entries(None)).await; + validate_term(&storage, mocked_entries(None)); + validate_entries(&storage, mocked_entries(None)); + let first_index = mocked_entries(None).first().unwrap().0; + let last_index = mocked_entries(None).last().unwrap().0; + validate_range(&storage, first_index, last_index); + + // 1. apply all entries in term 2. + storage.post_apply(mocked_entries(Some(2)).last().unwrap().0); + validate_term(&storage, mocked_entries(None)); + validate_entries(&storage, mocked_entries(None)); + validate_range(&storage, first_index, last_index); + + pub fn assert_no_missing_entries(entries: &[Entry]) { + let first_index = entries.first().map(|v| v.index); + let last_index = entries.last().map(|v| v.index); + if let (Some(first_index), Some(last_index)) = (first_index, last_index) { + let size = (last_index - first_index) as usize; + if entries.len() <= size { + panic!("some entries are missing"); } } + } - let entries = ::entries( - &storage, - first_index, - last_index, - Some(1024 * 2), - GetEntriesContext::empty(false), - ) - .unwrap(); - assert_no_missing_entries(&entries); + let entries = ::entries( + &storage, + first_index, + last_index, + Some(1024 * 2), + GetEntriesContext::empty(false), + ) + .unwrap(); + assert_no_missing_entries(&entries); - let entries = ::entries( - &storage, - first_index, - last_index, - Some(1024 * 6), - GetEntriesContext::empty(false), - ) - .unwrap(); - assert_no_missing_entries(&entries); - }); + let entries = ::entries( + &storage, + first_index, + last_index, + Some(1024 * 6), + GetEntriesContext::empty(false), + ) + .unwrap(); + assert_no_missing_entries(&entries); } fn make_entries(entries: Vec<(u64, u64)>) -> Vec { @@ -1014,4 +1006,101 @@ mod tests { cache.fetch_entries_to(1, 2, 10, &mut entries); assert_eq!(entries, vec![e]); } + + #[sekas_macro::test] + async fn raft_storage_must_be_unique_instance() { + { + let dir = TempDir::new(fn_name!()).unwrap(); + + let cfg = Config { + dir: dir.path().join("db").to_str().unwrap().to_owned(), + ..Default::default() + }; + let engine = Arc::new(Engine::open(cfg).unwrap()); + + let eval_results = vec![EvalResult { + op: Some(Box::new(SyncOp { + add_shard: Some(AddShard { + shard: Some(ShardDesc::with_range(1, 1, vec![b'a'], vec![b'b'])), + }), + ..Default::default() + })), + ..Default::default() + }]; + + write_initial_state(&RaftConfig::default(), engine.as_ref(), 1, vec![], eval_results) + .await + .unwrap(); + + let snap_mgr = SnapManager::new(dir.path().join("snap")); + let _storage = Storage::open( + &RaftConfig::default(), + 1, + 0, + ConfState::default(), + engine.clone(), + snap_mgr, + ) + .await + .unwrap(); + } + + { + let dir = TempDir::new(fn_name!()).unwrap(); + + let cfg = Config { + dir: dir.path().join("db").to_str().unwrap().to_owned(), + ..Default::default() + }; + let engine = Arc::new(Engine::open(cfg).unwrap()); + + let eval_results = vec![EvalResult { + op: Some(Box::new(SyncOp { + add_shard: Some(AddShard { + shard: Some(ShardDesc::with_range(1, 1, vec![b'a'], vec![b'c'])), + }), + ..Default::default() + })), + ..Default::default() + }]; + + write_initial_state( + &RaftConfig::default(), + engine.as_ref(), + 1, + vec![], + eval_results.clone(), + ) + .await + .unwrap(); + + let snap_mgr = SnapManager::new(dir.path().join("snap")); + let storage = Storage::open( + &RaftConfig::default(), + 1, + 0, + ConfState::default(), + engine.clone(), + snap_mgr, + ) + .await + .unwrap(); + let std::ops::Range { start, end } = storage.range(); + let entries = ::entries( + &storage, + start, + end, + Some(1024 * 2), + GetEntriesContext::empty(false), + ) + .unwrap(); + let entry = &entries[0]; + let expect_bytes = eval_results[0].encode_to_vec(); + assert_eq!( + expect_bytes, entry.data, + "expect bytes: {expect_bytes:?}, entry data: {:?}", + entry.data + ); + } + } } diff --git a/src/server/src/replica/eval/cmd_merge_shard.rs b/src/server/src/replica/eval/cmd_merge_shard.rs index e8b4023..bd1ec60 100644 --- a/src/server/src/replica/eval/cmd_merge_shard.rs +++ b/src/server/src/replica/eval/cmd_merge_shard.rs @@ -11,6 +11,8 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +use log::debug; use sekas_api::server::v1::*; use crate::replica::{EvalResult, GroupEngine, MergeShard, SyncOp}; @@ -20,6 +22,9 @@ use crate::{Error, Result}; pub(crate) fn merge_shard(engine: &GroupEngine, req: &MergeShardRequest) -> Result { let left_shard_id = req.left_shard_id; let right_shard_id = req.right_shard_id; + + debug!("execute merge shard {right_shard_id} into {left_shard_id}",); + let left_shard = engine.shard_desc(left_shard_id)?; let right_shard = engine.shard_desc(right_shard_id)?; let Some(RangePartition { start: _, end: left_end }) = &left_shard.range else { diff --git a/src/server/src/replica/eval/cmd_split_shard.rs b/src/server/src/replica/eval/cmd_split_shard.rs index d334cca..e8612d3 100644 --- a/src/server/src/replica/eval/cmd_split_shard.rs +++ b/src/server/src/replica/eval/cmd_split_shard.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use log::debug; use sekas_api::server::v1::*; use crate::replica::{EvalResult, GroupEngine, SplitShard, SyncOp}; @@ -21,6 +22,14 @@ use crate::{Error, Result}; pub(crate) fn split_shard(engine: &GroupEngine, req: &SplitShardRequest) -> Result { let old_shard_id = req.old_shard_id; let new_shard_id = req.new_shard_id; + + debug!( + "execute split shard from {} to {}, has split key: {}", + old_shard_id, + new_shard_id, + req.split_key.is_some() + ); + let shard_desc = engine.shard_desc(old_shard_id)?; let split_key = match req.split_key.as_ref().cloned() { Some(split_key) => { diff --git a/src/server/src/replica/fsm/mod.rs b/src/server/src/replica/fsm/mod.rs index b7c4206..1dce9c4 100644 --- a/src/server/src/replica/fsm/mod.rs +++ b/src/server/src/replica/fsm/mod.rs @@ -988,11 +988,8 @@ mod tests { apply_split_shard(&mut desc, test.split_shard).unwrap(); assert_eq!(desc.epoch, SHARD_UPDATE_DELTA); for expect_shard in expect_shards { - let got_shard = desc - .shards - .iter() - .find(|shard| shard.id == expect_shard.id) - .expect("The expect shard not exists"); + let got_shard = + desc.shard(expect_shard.id).expect("The expect shard not exists"); assert_eq!(*got_shard, expect_shard); } } else { @@ -1075,11 +1072,8 @@ mod tests { apply_merge_shard(&mut desc, test.merge_shard).unwrap(); assert_eq!(desc.epoch, SHARD_UPDATE_DELTA); for expect_shard in expect_shards { - let got_shard = desc - .shards - .iter() - .find(|shard| shard.id == expect_shard.id) - .expect("The expect shard not exists"); + let got_shard = + desc.shard(expect_shard.id).expect("The expect shard not exists"); assert_eq!(*got_shard, expect_shard); } } else {