From c1995c647b42c85bdd6b4444015b0d85e45c9b87 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 5 Jan 2022 10:15:21 +0300 Subject: [PATCH] fix(rpc): recreate dead and uncleaned subscriptions (#22281) --- rpc/src/rpc_subscription_tracker.rs | 53 +++++++++++++++++++---------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/rpc/src/rpc_subscription_tracker.rs b/rpc/src/rpc_subscription_tracker.rs index bc202591d24f93..c49ff6fd942a24 100644 --- a/rpc/src/rpc_subscription_tracker.rs +++ b/rpc/src/rpc_subscription_tracker.rs @@ -180,9 +180,10 @@ pub struct SignatureSubscriptionParams { #[derive(Clone)] pub struct SubscriptionControl(Arc); +pub struct WeakSubscriptionTokenRef(Weak, SubscriptionId); struct SubscriptionControlInner { - subscriptions: DashMap>, + subscriptions: DashMap, next_id: AtomicU64, max_active_subscriptions: usize, sender: crossbeam_channel::Sender, @@ -216,33 +217,44 @@ impl SubscriptionControl { self.0.subscriptions.len() ); let count = self.0.subscriptions.len(); - match self.0.subscriptions.entry(params) { - DashEntry::Occupied(entry) => Ok(SubscriptionToken( - entry - .get() - .upgrade() - .expect("dead subscription encountered in SubscriptionControl"), + let create_token_and_weak_ref = |id, params| { + let token = SubscriptionToken( + Arc::new(SubscriptionTokenInner { + control: Arc::clone(&self.0), + params, + id, + }), self.0.counter.create_token(), - )), + ); + let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id); + (token, weak_ref) + }; + + match self.0.subscriptions.entry(params) { + DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() { + Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())), + // This means the last Arc for this Weak pointer entered the drop just before us, + // but could not remove the entry since we are holding the write lock. + // See `Drop` implementation for `SubscriptionTokenInner` for further info. + None => { + let (token, weak_ref) = + create_token_and_weak_ref(entry.get().1, entry.key().clone()); + entry.insert(weak_ref); + Ok(token) + } + }, DashEntry::Vacant(entry) => { if count >= self.0.max_active_subscriptions { inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1); return Err(Error::TooManySubscriptions); } let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel)); - let token = SubscriptionToken( - Arc::new(SubscriptionTokenInner { - control: Arc::clone(&self.0), - params: entry.key().clone(), - id, - }), - self.0.counter.create_token(), - ); + let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone()); let _ = self .0 .sender .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into()); - entry.insert(Arc::downgrade(&token.0)); + entry.insert(weak_ref); datapoint_info!( "rpc-subscription", ("total", self.0.subscriptions.len(), i64) @@ -529,7 +541,9 @@ impl Drop for SubscriptionTokenInner { DashEntry::Vacant(_) => { warn!("Subscriptions inconsistency (missing entry in by_params)"); } - DashEntry::Occupied(entry) => { + // Check the strong refs count to ensure no other thread recreated this subscription (not token) + // while we were acquiring the lock. + DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => { let _ = self .control .sender @@ -540,6 +554,9 @@ impl Drop for SubscriptionTokenInner { ("total", self.control.subscriptions.len(), i64) ); } + // This branch handles the case in which this entry got recreated + // while we were waiting for the lock (inside the `DashMap::entry` method). + DashEntry::Occupied(_entry) /* if _entry.get().0.strong_count() > 0 */ => (), } } }