Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(orders): fix cancel order race condition using time-based cache #2232

Merged
merged 26 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6ad60e9
add recently cancelled orders time cache to orderbook
shamardy Sep 28, 2024
232019b
revert test_cancel_order changes
shamardy Sep 28, 2024
1c38da7
review fix: cherry-pick 62d26c7
mariocynicys Sep 4, 2024
da884fc
optimize `clear_expired_entries`
shamardy Sep 30, 2024
5762d13
provide a consistent interface for `ExpirableMap`
shamardy Sep 30, 2024
f2ef6d3
use Copy type parameter instead of Clone in `ExpirableMap`
shamardy Sep 30, 2024
87c53f8
make test_cancel_all_orders not rely on the order of messages
shamardy Oct 1, 2024
5591f89
Make `RECENTLY_CANCELLED_TIMEOUT` `Duration` Type
shamardy Oct 1, 2024
69fdfe9
use ctx as reference in `maker_order_cancelled_p2p_notify`
shamardy Oct 1, 2024
14f7f34
move `recently_cancelled` time cache to it's own mutex in `Ordermatch…
shamardy Oct 1, 2024
e0e5d41
Revert "move `recently_cancelled` time cache to it's own mutex in `Or…
shamardy Oct 2, 2024
cdfd809
move recently cancelled back to orderbook struct and handle it in a b…
shamardy Oct 2, 2024
860544f
review fix: clarify some comments
shamardy Oct 2, 2024
20fa7e5
add order to `recently_cancelled` even if it was in the `order_set`
shamardy Oct 2, 2024
d2675d2
review fix: add implementation details to `clear_expired_entries`
shamardy Oct 2, 2024
abfb252
review fix: check expiry on remove as well
shamardy Oct 2, 2024
5a839be
review fix: move implementation details to a comment inside the `clea…
shamardy Oct 2, 2024
598045d
revert `test_cancel_all_orders` to dev state and add `test_order_canc…
shamardy Oct 3, 2024
9faa804
fix clippy
shamardy Oct 3, 2024
158252c
review fix: remove return types from some functions that don't need them
shamardy Oct 3, 2024
ff54b83
review fix: fix comments in `test_order_cancellation_received_before_…
shamardy Oct 3, 2024
3f0a6bd
Merge remote-tracking branch 'origin/dev' into fix-p2p-cancellation
shamardy Oct 3, 2024
b1cae0f
reorder imports after merge
shamardy Oct 3, 2024
9dd4544
remove adex-cli CI workflow
shamardy Oct 4, 2024
d55efc3
update `mm2_libp2p` dependency name for consistency
shamardy Oct 4, 2024
1e6bbb8
Merge remote-tracking branch 'origin/dev' into fix-p2p-cancellation
shamardy Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,7 @@ impl WebsocketTransport {
}
}

async fn handle_keepalive(
&self,
wsocket: &mut WebSocketStream,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
expires_at: Option<Instant>,
) -> OuterAction {
async fn handle_keepalive(&self, wsocket: &mut WebSocketStream, expires_at: Option<Instant>) -> OuterAction {
const SIMPLE_REQUEST: &str = r#"{"jsonrpc":"2.0","method":"net_version","params":[],"id": 0 }"#;

if let Some(expires_at) = expires_at {
Expand All @@ -112,10 +107,7 @@ impl WebsocketTransport {
}
}

// Drop expired response notifier channels
response_notifiers.clear_expired_entries();
shamardy marked this conversation as resolved.
Show resolved Hide resolved

let mut should_continue = Default::default();
let mut should_continue = false;
for _ in 0..MAX_ATTEMPTS {
match wsocket
.send(tokio_tungstenite_wasm::Message::Text(SIMPLE_REQUEST.to_string()))
Expand Down Expand Up @@ -206,9 +198,6 @@ impl WebsocketTransport {
}

if let Some(id) = inc_event.get("id") {
// just to ensure we don't have outdated entries
response_notifiers.clear_expired_entries();

let request_id = id.as_u64().unwrap_or_default() as usize;

if let Some(notifier) = response_notifiers.remove(&request_id) {
mariocynicys marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -279,7 +268,7 @@ impl WebsocketTransport {
loop {
futures_util::select! {
_ = keepalive_interval.next().fuse() => {
match self.handle_keepalive(&mut wsocket, &mut response_notifiers, expires_at).await {
match self.handle_keepalive(&mut wsocket, expires_at).await {
OuterAction::None => {},
OuterAction::Continue => continue,
OuterAction::Break => break,
Expand Down
96 changes: 66 additions & 30 deletions mm2src/common/expirable_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use instant::{Duration, Instant};
use rustc_hash::FxHashMap;
use std::hash::Hash;
use std::{collections::BTreeMap, hash::Hash};

#[derive(Clone, Debug)]
pub struct ExpirableEntry<V> {
Expand All @@ -19,48 +19,84 @@ impl<V> ExpirableEntry<V> {
pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at }
}

impl<K: Eq + Hash, V> Default for ExpirableMap<K, V> {
impl<K: Eq + Hash + Copy, V> Default for ExpirableMap<K, V> {
fn default() -> Self { Self::new() }
}

/// A map that allows associating values with keys and expiring entries.
/// It is important to note that this implementation does not automatically
/// remove any entries; it is the caller's responsibility to invoke `clear_expired_entries`
/// at specified intervals.
/// It is important to note that this implementation does not have a background worker to
/// automatically clear expired entries. Outdated entries are only removed when the control flow
/// is handed back to the map mutably (i.e. some mutable method of the map is invoked).
///
/// WARNING: This is designed for performance-oriented use-cases utilizing `FxHashMap`
/// under the hood and is not suitable for cryptographic purposes.
#[derive(Clone, Debug)]
pub struct ExpirableMap<K: Eq + Hash, V>(FxHashMap<K, ExpirableEntry<V>>);
pub struct ExpirableMap<K: Eq + Hash + Copy, V> {
map: FxHashMap<K, ExpirableEntry<V>>,
/// A sorted inverse map from expiration times to keys to speed up expired entries clearing.
expiries: BTreeMap<Instant, K>,
}

impl<K: Eq + Hash, V> ExpirableMap<K, V> {
impl<K: Eq + Hash + Copy, V> ExpirableMap<K, V> {
/// Creates a new empty `ExpirableMap`
#[inline]
pub fn new() -> Self { Self(FxHashMap::default()) }
pub fn new() -> Self {
Self {
map: FxHashMap::default(),
expiries: BTreeMap::new(),
}
}

/// Returns the associated value if present and not expired.
#[inline]
pub fn get(&self, k: &K) -> Option<&V> {
self.map
.get(k)
.filter(|v| v.expires_at > Instant::now())
.map(|v| &v.value)
}

/// Returns the associated value if present.
/// Removes a key-value pair from the map and returns the associated value if present and not expired.
#[inline]
pub fn get(&mut self, k: &K) -> Option<&V> { self.0.get(k).map(|v| &v.value) }
pub fn remove(&mut self, k: &K) -> Option<V> {
self.map.remove(k).filter(|v| v.expires_at > Instant::now()).map(|v| {
self.expiries.remove(&v.expires_at);
v.value
})
}

/// Inserts a key-value pair with an expiration duration.
///
/// If a value already exists for the given key, it will be updated and then
/// the old one will be returned.
pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option<V> {
let entry = ExpirableEntry {
expires_at: Instant::now() + exp,
value: v,
};

self.0.insert(k, entry).map(|v| v.value)
self.clear_expired_entries();
shamardy marked this conversation as resolved.
Show resolved Hide resolved
let expires_at = Instant::now() + exp;
let entry = ExpirableEntry { expires_at, value: v };
self.expiries.insert(expires_at, k);
self.map.insert(k, entry).map(|v| v.value)
}

/// Removes expired entries from the map.
pub fn clear_expired_entries(&mut self) { self.0.retain(|_k, v| Instant::now() < v.expires_at); }

/// Removes a key-value pair from the map and returns the associated value if present.
#[inline]
pub fn remove(&mut self, k: &K) -> Option<V> { self.0.remove(k).map(|v| v.value) }
///
/// Iterates through the `expiries` in order, removing entries that have expired.
/// Stops at the first non-expired entry, leveraging the sorted nature of `BTreeMap`.
fn clear_expired_entries(&mut self) {
let now = Instant::now();

// `pop_first()` is used here as it efficiently removes expired entries.
// `first_key_value()` was considered as it wouldn't need re-insertion for
// non-expired entries, but it would require an extra remove operation for
// each expired entry. `pop_first()` needs only one re-insertion per call,
// which is an acceptable trade-off compared to multiple remove operations.
while let Some((exp, key)) = self.expiries.pop_first() {
if exp > now {
self.expiries.insert(exp, key);
shamardy marked this conversation as resolved.
Show resolved Hide resolved
break;
}
self.map.remove(&key);
}
}
}

#[cfg(any(test, target_arch = "wasm32"))]
Expand All @@ -80,8 +116,8 @@ mod tests {
let exp = Duration::from_secs(1);

// Insert 2 entries with 1 sec expiration time
expirable_map.insert("key1".to_string(), value.to_string(), exp);
expirable_map.insert("key2".to_string(), value.to_string(), exp);
expirable_map.insert("key1", value, exp);
expirable_map.insert("key2", value, exp);

// Wait for entries to expire
Timer::sleep(2.).await;
Expand All @@ -90,14 +126,14 @@ mod tests {
expirable_map.clear_expired_entries();

// We waited for 2 seconds, so we shouldn't have any entry accessible
assert_eq!(expirable_map.0.len(), 0);
assert_eq!(expirable_map.map.len(), 0);

// Insert 5 entries
expirable_map.insert("key1".to_string(), value.to_string(), Duration::from_secs(5));
expirable_map.insert("key2".to_string(), value.to_string(), Duration::from_secs(4));
expirable_map.insert("key3".to_string(), value.to_string(), Duration::from_secs(7));
expirable_map.insert("key4".to_string(), value.to_string(), Duration::from_secs(2));
expirable_map.insert("key5".to_string(), value.to_string(), Duration::from_millis(3750));
expirable_map.insert("key1", value, Duration::from_secs(5));
expirable_map.insert("key2", value, Duration::from_secs(4));
expirable_map.insert("key3", value, Duration::from_secs(7));
expirable_map.insert("key4", value, Duration::from_secs(2));
expirable_map.insert("key5", value, Duration::from_millis(3750));

// Wait 2 seconds to expire some entries
Timer::sleep(2.).await;
Expand All @@ -106,6 +142,6 @@ mod tests {
expirable_map.clear_expired_entries();

// We waited for 2 seconds, only one entry should expire
assert_eq!(expirable_map.0.len(), 4);
assert_eq!(expirable_map.map.len(), 4);
});
}
2 changes: 0 additions & 2 deletions mm2src/mm2_core/src/data_asker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ pub async fn send_asked_data_rpc(
asked_data: SendAskedDataRequest,
) -> Result<bool, MmError<SendAskedDataError>> {
let mut awaiting_asks = ctx.data_asker.awaiting_asks.lock().await;
awaiting_asks.clear_expired_entries();

match awaiting_asks.remove(&asked_data.data_id) {
mariocynicys marked this conversation as resolved.
Show resolved Hide resolved
Some(sender) => {
sender.send(asked_data.data).map_to_mm(|_| {
Expand Down
Loading
Loading