Skip to content

Commit

Permalink
refactor expirable map
Browse files Browse the repository at this point in the history
This refactors expirable map to make it so that deleting expired entries
is cheap (not looping over the whole map, but looping only over the
known expired entries).

This increases the memory requirements though because the key of type
`K` is now cloned an extra time to be stored in the inverse map.

A solution for the potentially expensive cloning is to wrap the key in
an Arc, but, looking at the usage of expirable map, it looks like most
(actually all) of the times, the keys are integer IDs, which are cheaply
cloned/copied. So leaving this as a direct clone for now until the need
arises.
  • Loading branch information
mariocynicys committed Sep 4, 2024
1 parent 48f5ddd commit 62d26c7
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 39 deletions.
2 changes: 1 addition & 1 deletion mm2src/coins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mm2_event_stream = { path = "../mm2_event_stream" }
mm2_git = { path = "../mm2_git" }
mm2_io = { path = "../mm2_io" }
mm2_metrics = { path = "../mm2_metrics" }
mm2_net = { path = "../mm2_net" }
mm2_net = { path = "../mm2_net", features = ["p2p"] }
mm2_number = { path = "../mm2_number"}
mm2_rpc = { path = "../mm2_rpc" }
mm2_state_machine = { path = "../mm2_state_machine" }
Expand Down
17 changes: 3 additions & 14 deletions mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,7 @@ impl WebsocketTransport {
}
}

async fn handle_keepalive(
&self,
wsocket: &mut WebSocketStream,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
expires_at: Option<Instant>,
) -> OuterAction {
async fn handle_keepalive(&self, wsocket: &mut WebSocketStream, expires_at: Option<Instant>) -> OuterAction {
const SIMPLE_REQUEST: &str = r#"{"jsonrpc":"2.0","method":"net_version","params":[],"id": 0 }"#;

if let Some(expires_at) = expires_at {
Expand All @@ -112,10 +107,7 @@ impl WebsocketTransport {
}
}

// Drop expired response notifier channels
response_notifiers.clear_expired_entries();

let mut should_continue = Default::default();
let mut should_continue = false;
for _ in 0..MAX_ATTEMPTS {
match wsocket
.send(tokio_tungstenite_wasm::Message::Text(SIMPLE_REQUEST.to_string()))
Expand Down Expand Up @@ -206,9 +198,6 @@ impl WebsocketTransport {
}

if let Some(id) = inc_event.get("id") {
// just to ensure we don't have outdated entries
response_notifiers.clear_expired_entries();

let request_id = id.as_u64().unwrap_or_default() as usize;

if let Some(notifier) = response_notifiers.remove(&request_id) {
Expand Down Expand Up @@ -279,7 +268,7 @@ impl WebsocketTransport {
loop {
futures_util::select! {
_ = keepalive_interval.next().fuse() => {
match self.handle_keepalive(&mut wsocket, &mut response_notifiers, expires_at).await {
match self.handle_keepalive(&mut wsocket, expires_at).await {
OuterAction::None => {},
OuterAction::Continue => continue,
OuterAction::Break => break,
Expand Down
70 changes: 48 additions & 22 deletions mm2src/common/expirable_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use instant::{Duration, Instant};
use rustc_hash::FxHashMap;
use std::hash::Hash;
use std::{collections::BTreeMap, hash::Hash};

#[derive(Clone, Debug)]
pub struct ExpirableEntry<V> {
Expand All @@ -19,48 +19,74 @@ impl<V> ExpirableEntry<V> {
pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at }
}

impl<K: Eq + Hash, V> Default for ExpirableMap<K, V> {
impl<K: Eq + Hash + Clone, V> Default for ExpirableMap<K, V> {
fn default() -> Self { Self::new() }
}

/// A map that allows associating values with keys and expiring entries.
/// It is important to note that this implementation does not automatically
/// remove any entries; it is the caller's responsibility to invoke `clear_expired_entries`
/// at specified intervals.
/// It is important to note that this implementation does not have a background worker to
/// automatically clear expired entries. Outdated entries are only removed when the control flow
/// is handed back to the map mutably (i.e. some mutable method of the map is invoked).
///
/// WARNING: This is designed for performance-oriented use-cases utilizing `FxHashMap`
/// under the hood and is not suitable for cryptographic purposes.
#[derive(Clone, Debug)]
pub struct ExpirableMap<K: Eq + Hash, V>(FxHashMap<K, ExpirableEntry<V>>);
pub struct ExpirableMap<K: Eq + Hash + Clone, V> {
map: FxHashMap<K, ExpirableEntry<V>>,
/// A sorted inverse map from expiration times to keys to speed up expired entries clearing.
expiries: BTreeMap<Instant, K>,
}

impl<K: Eq + Hash, V> ExpirableMap<K, V> {
impl<K: Eq + Hash + Clone, V> ExpirableMap<K, V> {
/// Creates a new empty `ExpirableMap`
#[inline]
pub fn new() -> Self { Self(FxHashMap::default()) }
pub fn new() -> Self {
Self {
map: FxHashMap::default(),
expiries: BTreeMap::new(),
}
}

/// Returns the associated value if present.
///
/// Note that if the entry is expired and wasn't cleared yet, it will still be returned.
/// Use `remove()` instead to avoid getting expired entries.
#[inline]
pub fn get(&self, k: &K) -> Option<&V> { self.map.get(k).map(|v| &v.value) }

/// Removes a key-value pair from the map and returns the associated value if present.
#[inline]
pub fn get(&mut self, k: &K) -> Option<&V> { self.0.get(k).map(|v| &v.value) }
pub fn remove(&mut self, k: &K) -> Option<V> {
self.clear_expired_entries();
let entry = self.map.remove(k)?;
self.expiries.remove(&entry.expires_at);
Some(entry.value)
}

/// Inserts a key-value pair with an expiration duration.
///
/// If a value already exists for the given key, it will be updated and then
/// the old one will be returned.
pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option<V> {
let entry = ExpirableEntry {
expires_at: Instant::now() + exp,
value: v,
};

self.0.insert(k, entry).map(|v| v.value)
self.clear_expired_entries();
let expires_at = Instant::now() + exp;
let entry = ExpirableEntry { expires_at, value: v };
self.expiries.insert(expires_at, k.clone());
self.map.insert(k, entry).map(|v| v.value)
}

/// Removes expired entries from the map.
pub fn clear_expired_entries(&mut self) { self.0.retain(|_k, v| Instant::now() < v.expires_at); }

/// Removes a key-value pair from the map and returns the associated value if present.
#[inline]
pub fn remove(&mut self, k: &K) -> Option<V> { self.0.remove(k).map(|v| v.value) }
fn clear_expired_entries(&mut self) {
let keys_to_remove: Vec<_> = self
.expiries
.range(..=Instant::now())
.map(|(exp, key)| (*exp, key.clone()))
.collect();
for (exp, key) in keys_to_remove {
self.map.remove(&key);
self.expiries.remove(&exp);
}
}
}

#[cfg(any(test, target_arch = "wasm32"))]
Expand Down Expand Up @@ -90,7 +116,7 @@ mod tests {
expirable_map.clear_expired_entries();

// We waited for 2 seconds, so we shouldn't have any entry accessible
assert_eq!(expirable_map.0.len(), 0);
assert_eq!(expirable_map.map.len(), 0);

// Insert 5 entries
expirable_map.insert("key1".to_string(), value.to_string(), Duration::from_secs(5));
Expand All @@ -106,6 +132,6 @@ mod tests {
expirable_map.clear_expired_entries();

// We waited for 2 seconds, only one entry should expire
assert_eq!(expirable_map.0.len(), 4);
assert_eq!(expirable_map.map.len(), 4);
});
}
2 changes: 0 additions & 2 deletions mm2src/mm2_core/src/data_asker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ pub async fn send_asked_data_rpc(
asked_data: SendAskedDataRequest,
) -> Result<bool, MmError<SendAskedDataError>> {
let mut awaiting_asks = ctx.data_asker.awaiting_asks.lock().await;
awaiting_asks.clear_expired_entries();

match awaiting_asks.remove(&asked_data.data_id) {
Some(sender) => {
sender.send(asked_data.data).map_to_mm(|_| {
Expand Down

0 comments on commit 62d26c7

Please sign in to comment.