Skip to content

Commit

Permalink
Add cmd merge/split shard test
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Jun 7, 2024
1 parent bb2afb1 commit 6132bc6
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 90 deletions.
3 changes: 2 additions & 1 deletion src/server/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ pub(crate) fn open_raft_engine(log_path: &Path) -> Result<raft_engine::Engine> {
use raft_engine::{Config, Engine};
let engine_dir = log_path.join("engine");
let snap_dir = log_path.join("snap");
info!("open raft engine {}", engine_dir.display());
create_dir_all_if_not_exists(&engine_dir)?;
create_dir_all_if_not_exists(&snap_dir)?;
let engine_cfg = Config {
Expand Down Expand Up @@ -339,7 +340,7 @@ mod tests {
{
let engine = open_raft_engine(dir.path()).unwrap();
let mut batch = LogBatch::default();
batch.put(1, vec![1, 2, 3], vec![4, 5, 6]);
batch.put(1, vec![1, 2, 3], vec![4, 5, 6]).unwrap();
engine.write(&mut batch, true).unwrap();
}

Expand Down
194 changes: 192 additions & 2 deletions src/server/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ mod tests {

use super::*;
use crate::constants::INITIAL_EPOCH;
use crate::RaftConfig;

const TABLE_ID: u64 = 1;
const NODE_ID: u64 = 2;
Expand All @@ -745,7 +746,11 @@ mod tests {

async fn create_node<P: AsRef<Path>>(root_dir: P) -> Node {
let root_dir = root_dir.as_ref().to_owned();
let config = Config { root_dir, ..Default::default() };
let config = Config {
root_dir,
raft: RaftConfig { tick_interval_ms: 10, ..Default::default() },
..Default::default()
};

let engines = Engines::open(&config.root_dir, &config.db).unwrap();
let transport_manager = TransportManager::new(vec![], engines.state()).await;
Expand All @@ -765,6 +770,11 @@ mod tests {
node.replica_table().find(GROUP_ID).unwrap()
}

async fn create_first_replica_with_desc(node: &Node, group_desc: GroupDesc) -> Arc<Replica> {
node.create_replica(REPLICA_ID, group_desc).await.unwrap();
node.replica_table().find(GROUP_ID).unwrap()
}

async fn get_replica_state(node: Node, replica_id: u64) -> Option<ReplicaLocalState> {
node.state_engine()
.replica_states()
Expand Down Expand Up @@ -931,9 +941,16 @@ mod tests {
}

async fn execute_on_leader(replica: &Replica, request: Request) -> Response {
execute_on_leader_with_result(replica, request).await.unwrap()
}

async fn execute_on_leader_with_result(
replica: &Replica,
request: Request,
) -> Result<Response> {
assert!(replica.on_leader(fn_name!(), false).await.unwrap().is_some());
let mut exec_ctx = ExecCtx::with_epoch(replica.epoch());
replica.execute(&mut exec_ctx, &request).await.unwrap()
replica.execute(&mut exec_ctx, &request).await
}

async fn execute_read_request(replica: &Replica, key: &[u8]) -> Option<Value> {
Expand Down Expand Up @@ -1193,4 +1210,177 @@ mod tests {
matches!(value, Some(v) if v.content.as_ref().unwrap() == b"456" && v.version == second_commit_version)
);
}

#[sekas_macro::test]
async fn execute_split_shard_request() {
struct Test {
origin_shards: Vec<ShardDesc>,
split_shard: SplitShard,
expect_shards: Option<Vec<ShardDesc>>,
}

let table_id = 1;
let tests = vec![
// No such left shard exists.
Test {
origin_shards: vec![],
split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![] },
expect_shards: None,
},
// split key out of range.
Test {
origin_shards: vec![ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b'])],
split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![b'c'] },
expect_shards: None,
},
Test {
origin_shards: vec![ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b'])],
split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![b'b'] },
expect_shards: None,
},
// Split into two shards
Test {
origin_shards: vec![ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'c'])],
split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![b'b'] },
expect_shards: Some(vec![
ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']),
ShardDesc::with_range(1, table_id, vec![b'b'], vec![b'c']),
]),
},
Test {
origin_shards: vec![
ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'c']),
ShardDesc::with_range(2, table_id, vec![b'c'], vec![b'd']),
],
split_shard: SplitShard { old_shard_id: 0, new_shard_id: 1, split_key: vec![b'b'] },
expect_shards: Some(vec![
ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']),
ShardDesc::with_range(1, table_id, vec![b'b'], vec![b'c']),
ShardDesc::with_range(2, table_id, vec![b'c'], vec![b'd']),
]),
},
];
for (i, test) in tests.into_iter().enumerate() {
let dir = TempDir::new(fn_name!()).unwrap();
let node = bootstrap_node(dir.path()).await;
info!("origin shards: {:?}", test.origin_shards);
let desc = GroupDesc { shards: test.origin_shards, ..group_descriptor() };
info!("desc: {desc:?}");
let replica = create_first_replica_with_desc(&node, desc.clone()).await;

let request = Request::SplitShard(SplitShardRequest {
old_shard_id: test.split_shard.old_shard_id,
new_shard_id: test.split_shard.new_shard_id,
split_key: Some(test.split_shard.split_key),
});
info!("before execute, descriptor: {:?}", replica.descriptor());
let result = execute_on_leader_with_result(&replica, request).await;
if let Some(expect_shards) = test.expect_shards {
assert!(
matches!(result, Ok(Response::SplitShard(_))),
"{i} got result: {result:?}, desc: {:?}",
replica.descriptor()
);
let desc = replica.descriptor();
for expect_shard in expect_shards {
let got_shard =
desc.shard(expect_shard.id).expect("The expect shard not exists");
assert_eq!(*got_shard, expect_shard);
}
} else {
assert!(result.is_err());
}
}
}

#[sekas_macro::test]
async fn execute_merge_shard_request() {
struct Test {
origin_shards: Vec<ShardDesc>,
merge_shard: MergeShard,
expect_shards: Option<Vec<ShardDesc>>,
}

let table_id = 1;
let tests = vec![
// Shard not found.
Test {
origin_shards: vec![ShardDesc::with_range(0, table_id, vec![], vec![])],
merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 },
expect_shards: None,
},
Test {
origin_shards: vec![ShardDesc::with_range(1, table_id, vec![], vec![])],
merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 },
expect_shards: None,
},
// Table is not equals.
Test {
origin_shards: vec![
ShardDesc::with_range(0, table_id, vec![], vec![]),
ShardDesc::with_range(1, table_id + 1, vec![], vec![]),
],
merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 },
expect_shards: None,
},
// Range is not close to.
Test {
origin_shards: vec![
ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']),
ShardDesc::with_range(1, table_id, vec![b'c'], vec![b'd']),
],
merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 },
expect_shards: None,
},
// Merge two shards.
Test {
origin_shards: vec![
ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']),
ShardDesc::with_range(1, table_id, vec![b'b'], vec![b'c']),
],
merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 },
expect_shards: Some(vec![ShardDesc::with_range(
0,
table_id,
vec![b'a'],
vec![b'c'],
)]),
},
Test {
origin_shards: vec![
ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'b']),
ShardDesc::with_range(1, table_id, vec![b'b'], vec![b'c']),
ShardDesc::with_range(2, table_id, vec![b'c'], vec![b'd']),
],
merge_shard: MergeShard { left_shard_id: 0, right_shard_id: 1 },
expect_shards: Some(vec![
ShardDesc::with_range(0, table_id, vec![b'a'], vec![b'c']),
ShardDesc::with_range(2, table_id, vec![b'c'], vec![b'd']),
]),
},
];
for test in tests {
let dir = TempDir::new(fn_name!()).unwrap();
let node = bootstrap_node(dir.path()).await;
let desc = GroupDesc { shards: test.origin_shards, ..group_descriptor() };
let replica = create_first_replica_with_desc(&node, desc.clone()).await;

let request = Request::MergeShard(MergeShardRequest {
left_shard_id: test.merge_shard.left_shard_id,
right_shard_id: test.merge_shard.right_shard_id,
});
let result = execute_on_leader_with_result(&replica, request).await;
if let Some(expect_shards) = test.expect_shards {
assert!(matches!(result, Ok(Response::MergeShard(_))), "got result: {result:?}");
let desc = replica.descriptor();
for expect_shard in expect_shards {
let got_shard =
desc.shard(expect_shard.id).expect("The expect shard not exists");
assert_eq!(*got_shard, expect_shard);
}
} else {
assert!(result.is_err());
}
}
}
}
Loading

0 comments on commit 6132bc6

Please sign in to comment.