Skip to content

Commit

Permalink
fix(master_kv): replace tokio lock with parking_lot for kv_ope_notify
Browse files Browse the repository at this point in the history
  • Loading branch information
ActivePeter committed Mar 6, 2024
1 parent 91346f7 commit 301afaa
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions src/master/m_master_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ pub struct MasterKv {
rpc_handler: RPCHandler<proto::kv::KvRequests>,
view: MasterKvView,
kv_ope_id_allocator: AtomicU32,
kv_ope_notify: tokio::sync::RwLock<HashMap<u32, Arc<Notify>>>,
// kv_ope_notify: tokio::sync::RwLock<HashMap<u32, Arc<Notify>>>,
kv_ope_notify: RwLock<HashMap<u32, Arc<Notify>>>,
}

#[async_trait]
Expand All @@ -61,7 +62,8 @@ impl LogicalModule for MasterKv {
rpc_handler: RPCHandler::default(),
view: MasterKvView::new(args.logical_modules_ref.clone()),
kv_ope_id_allocator: AtomicU32::new(0),
kv_ope_notify: tokio::sync::RwLock::new(HashMap::new()),
// kv_ope_notify: tokio::sync::RwLock::new(HashMap::new()),
kv_ope_notify: RwLock::new(HashMap::new()),
}
}
async fn start(&self) -> WSResult<Vec<JoinHandleWrapper>> {
Expand Down Expand Up @@ -98,14 +100,17 @@ impl MasterKv {
responsor: RPCResponsor<KvRequests>,
) {
if reqs.prev_kv_opeid >= 0 {
let hold_nots = self.kv_ope_notify.read().await;
if let Some(not) = hold_nots
.get(&(reqs.prev_kv_opeid as u32))
.map(|v| v.clone())
{
let noted = not.notified();
drop(hold_nots);
noted.await;
let mut _hold_not_arc = None;
let noted = {
let hold_nots = self.kv_ope_notify.read();
hold_nots.get(&(reqs.prev_kv_opeid as u32)).map(|v| {
_hold_not_arc = Some(v.clone());
_hold_not_arc.as_ref().unwrap().notified()
})
};

if let Some(noted) = noted {
noted.await
}
}
let mut kv_responses = KvResponses { responses: vec![] };
Expand All @@ -127,7 +132,6 @@ impl MasterKv {
assert!(self
.kv_ope_notify
.write()
.await
.insert(kv_opeid.unwrap(), Notify::new().into())
.is_none());
}
Expand Down Expand Up @@ -155,7 +159,7 @@ impl MasterKv {
tracing::debug!("notify all waiting kv operations");
// notify all waiting kv operations
if let Some(opeid) = kv_opeid {
if let Some(notify) = self.kv_ope_notify.write().await.remove(&opeid) {
if let Some(notify) = self.kv_ope_notify.write().remove(&opeid) {
notify.notify_waiters();
} else {
panic!("fatal logical error, kv opeid:{} not found", opeid);
Expand Down

0 comments on commit 301afaa

Please sign in to comment.