Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Automatically unsubscribe storage listeners when they're dropped (RCP node memory leak fix) #10454

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 174 additions & 55 deletions client/api/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,29 @@

use std::{
collections::{HashMap, HashSet},
sync::Arc,
pin::Pin,
sync::{Arc, Weak},
task::Poll,
};

use fnv::{FnvHashMap, FnvHashSet};
use futures::Stream;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, Opts, Registry, U64};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_core::storage::{StorageData, StorageKey};
use sp_core::{
hexdisplay::HexDisplay,
storage::{StorageData, StorageKey},
};
use sp_runtime::traits::Block as BlockT;

/// Storage change set
#[derive(Debug)]
pub struct StorageChangeSet {
changes: Arc<Vec<(StorageKey, Option<StorageData>)>>,
child_changes: Arc<Vec<(StorageKey, Vec<(StorageKey, Option<StorageData>)>)>>,
filter: Option<HashSet<StorageKey>>,
child_filters: Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
filter: Keys,
child_filters: ChildKeys,
}

impl StorageChangeSet {
Expand Down Expand Up @@ -74,15 +81,60 @@ impl StorageChangeSet {
}

/// Type that implements `futures::Stream` of storage change events.
pub type StorageEventStream<H> = TracingUnboundedReceiver<(H, StorageChangeSet)>;
pub struct StorageEventStream<H> {
rx: TracingUnboundedReceiver<(H, StorageChangeSet)>,
storage_notifications: Weak<Mutex<StorageNotificationsImpl<H>>>,
was_triggered: bool,
id: u64,
}

impl<H> Stream for StorageEventStream<H> {
type Item = <TracingUnboundedReceiver<(H, StorageChangeSet)> as Stream>::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let result = Stream::poll_next(Pin::new(&mut self.rx), cx);
if result.is_ready() {
self.was_triggered = true;
}
result
}
}

impl<H> Drop for StorageEventStream<H> {
fn drop(&mut self) {
if let Some(storage_notifications) = self.storage_notifications.upgrade() {
if let Some((keys, child_keys)) =
storage_notifications.lock().remove_subscriber(self.id)
{
if !self.was_triggered {
log::trace!(
target: "storage_notifications",
"Listener was never triggered: id={}, keys={:?}, child_keys={:?}",
self.id,
PrintKeys(&keys),
PrintChildKeys(&child_keys),
);
}
}
}
}
}

type SubscriberId = u64;

type SubscribersGauge = CounterVec<U64>;

/// Manages storage listeners.
#[derive(Debug)]
pub struct StorageNotifications<Block: BlockT> {
pub struct StorageNotifications<Block: BlockT>(Arc<Mutex<StorageNotificationsImpl<Block::Hash>>>);

type Keys = Option<HashSet<StorageKey>>;
type ChildKeys = Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>;

#[derive(Debug)]
struct StorageNotificationsImpl<Hash> {
metrics: Option<SubscribersGauge>,
next_id: SubscriberId,
wildcard_listeners: FnvHashSet<SubscriberId>,
Expand All @@ -93,15 +145,17 @@ pub struct StorageNotifications<Block: BlockT> {
>,
sinks: FnvHashMap<
SubscriberId,
(
TracingUnboundedSender<(Block::Hash, StorageChangeSet)>,
Option<HashSet<StorageKey>>,
Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
),
(TracingUnboundedSender<(Hash, StorageChangeSet)>, Keys, ChildKeys),
>,
}

impl<Block: BlockT> Default for StorageNotifications<Block> {
fn default() -> Self {
Self(Default::default())
}
}

impl<Hash> Default for StorageNotificationsImpl<Hash> {
fn default() -> Self {
Self {
metrics: Default::default(),
Expand All @@ -114,10 +168,68 @@ impl<Block: BlockT> Default for StorageNotifications<Block> {
}
}

struct PrintKeys<'a>(&'a Keys);
impl<'a> std::fmt::Debug for PrintKeys<'a> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(keys) = self.0 {
fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish()
} else {
write!(fmt, "None")
}
}
}

struct PrintChildKeys<'a>(&'a ChildKeys);
impl<'a> std::fmt::Debug for PrintChildKeys<'a> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(map) = self.0 {
fmt.debug_map()
.entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values))))
.finish()
} else {
write!(fmt, "None")
}
}
}

impl<Block: BlockT> StorageNotifications<Block> {
/// Initialize a new StorageNotifications
/// optionally pass a prometheus registry to send subscriber metrics to
pub fn new(prometheus_registry: Option<Registry>) -> Self {
StorageNotifications(Arc::new(Mutex::new(StorageNotificationsImpl::new(
prometheus_registry,
))))
}

/// Trigger notification to all listeners.
///
/// Note the changes are going to be filtered by listener's filter key.
/// In fact no event might be sent if clients are not interested in the changes.
pub fn trigger(
&mut self,
hash: &Block::Hash,
changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
child_changeset: impl Iterator<
Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
>,
) {
self.0.lock().trigger(hash, changeset, child_changeset);
}

/// Start listening for particular storage keys.
pub fn listen(
&mut self,
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
let (id, rx) = self.0.lock().listen(filter_keys, filter_child_keys);
let storage_notifications = Arc::downgrade(&self.0);
StorageEventStream { rx, storage_notifications, was_triggered: false, id }
}
}

impl<Hash> StorageNotificationsImpl<Hash> {
fn new(prometheus_registry: Option<Registry>) -> Self {
let metrics = prometheus_registry.and_then(|r| {
CounterVec::new(
Opts::new(
Expand All @@ -130,7 +242,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
.ok()
});

StorageNotifications {
StorageNotificationsImpl {
metrics,
next_id: Default::default(),
wildcard_listeners: Default::default(),
Expand All @@ -139,18 +251,16 @@ impl<Block: BlockT> StorageNotifications<Block> {
sinks: Default::default(),
}
}
/// Trigger notification to all listeners.
bkchr marked this conversation as resolved.
Show resolved Hide resolved
///
/// Note the changes are going to be filtered by listener's filter key.
/// In fact no event might be sent if clients are not interested in the changes.
pub fn trigger(
fn trigger(
&mut self,
hash: &Block::Hash,
hash: &Hash,
changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
child_changeset: impl Iterator<
Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
>,
) {
) where
Hash: Clone,
{
let has_wildcard = !self.wildcard_listeners.is_empty();

// early exit if no listeners
Expand Down Expand Up @@ -244,7 +354,7 @@ impl<Block: BlockT> StorageNotifications<Block> {

fn remove_subscriber_from(
subscriber: &SubscriberId,
filters: &Option<HashSet<StorageKey>>,
filters: &Keys,
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
wildcards: &mut FnvHashSet<SubscriberId>,
) {
Expand All @@ -269,42 +379,43 @@ impl<Block: BlockT> StorageNotifications<Block> {
}
}

fn remove_subscriber(&mut self, subscriber: SubscriberId) {
if let Some((_, filters, child_filters)) = self.sinks.remove(&subscriber) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut self.listeners,
&mut self.wildcard_listeners,
);
if let Some(child_filters) = child_filters.as_ref() {
for (c_key, filters) in child_filters {
if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut *listeners,
&mut *wildcards,
);

if listeners.is_empty() && wildcards.is_empty() {
self.child_listeners.remove(&c_key);
}
fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> {
let (_, filters, child_filters) = self.sinks.remove(&subscriber)?;
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut self.listeners,
&mut self.wildcard_listeners,
);
if let Some(child_filters) = child_filters.as_ref() {
for (c_key, filters) in child_filters {
if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut *listeners,
&mut *wildcards,
);

if listeners.is_empty() && wildcards.is_empty() {
self.child_listeners.remove(&c_key);
}
}
}
if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"removed"]).inc();
}
}
if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"removed"]).inc();
}

Some((filters, child_filters))
}

fn listen_from(
current_id: SubscriberId,
filter_keys: &Option<impl AsRef<[StorageKey]>>,
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
wildcards: &mut FnvHashSet<SubscriberId>,
) -> Option<HashSet<StorageKey>> {
) -> Keys {
match filter_keys {
None => {
wildcards.insert(current_id);
Expand All @@ -325,12 +436,11 @@ impl<Block: BlockT> StorageNotifications<Block> {
}
}

/// Start listening for particular storage keys.
pub fn listen(
fn listen(
&mut self,
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
) -> (u64, TracingUnboundedReceiver<(Hash, StorageChangeSet)>) {
self.next_id += 1;
let current_id = self.next_id;

Expand Down Expand Up @@ -364,7 +474,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
m.with_label_values(&[&"added"]).inc();
}

rx
(current_id, rx)
}
}

Expand Down Expand Up @@ -517,9 +627,9 @@ mod tests {
let _recv3 = futures::executor::block_on_stream(notifications.listen(None, None));
let _recv4 =
futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter)));
assert_eq!(notifications.listeners.len(), 2);
assert_eq!(notifications.wildcard_listeners.len(), 2);
assert_eq!(notifications.child_listeners.len(), 1);
assert_eq!(notifications.0.lock().listeners.len(), 2);
assert_eq!(notifications.0.lock().wildcard_listeners.len(), 2);
assert_eq!(notifications.0.lock().child_listeners.len(), 1);
}

// when
Expand All @@ -528,9 +638,18 @@ mod tests {
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset);

// then
assert_eq!(notifications.listeners.len(), 0);
assert_eq!(notifications.wildcard_listeners.len(), 0);
assert_eq!(notifications.child_listeners.len(), 0);
assert_eq!(notifications.0.lock().listeners.len(), 0);
assert_eq!(notifications.0.lock().wildcard_listeners.len(), 0);
assert_eq!(notifications.0.lock().child_listeners.len(), 0);
}

#[test]
fn should_cleanup_subscriber_if_stream_is_dropped() {
let mut notifications = StorageNotifications::<Block>::default();
let stream = notifications.listen(None, None);
assert_eq!(notifications.0.lock().sinks.len(), 1);
std::mem::drop(stream);
assert_eq!(notifications.0.lock().sinks.len(), 0);
}

#[test]
Expand Down