diff --git a/Cargo.lock b/Cargo.lock index 431959e5c5..08a985285b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4404,6 +4404,7 @@ dependencies = [ "mm2_err_handle", "mm2_event_stream", "mm2_metrics", + "mm2_p2p", "mm2_rpc", "primitives", "rand 0.7.3", diff --git a/mm2src/coins/eth/web3_transport/websocket_transport.rs b/mm2src/coins/eth/web3_transport/websocket_transport.rs index d17ef99d7e..fd1220e92e 100644 --- a/mm2src/coins/eth/web3_transport/websocket_transport.rs +++ b/mm2src/coins/eth/web3_transport/websocket_transport.rs @@ -97,12 +97,7 @@ impl WebsocketTransport { } } - async fn handle_keepalive( - &self, - wsocket: &mut WebSocketStream, - response_notifiers: &mut ExpirableMap>>, - expires_at: Option, - ) -> OuterAction { + async fn handle_keepalive(&self, wsocket: &mut WebSocketStream, expires_at: Option) -> OuterAction { const SIMPLE_REQUEST: &str = r#"{"jsonrpc":"2.0","method":"net_version","params":[],"id": 0 }"#; if let Some(expires_at) = expires_at { @@ -112,10 +107,7 @@ impl WebsocketTransport { } } - // Drop expired response notifier channels - response_notifiers.clear_expired_entries(); - - let mut should_continue = Default::default(); + let mut should_continue = false; for _ in 0..MAX_ATTEMPTS { match wsocket .send(tokio_tungstenite_wasm::Message::Text(SIMPLE_REQUEST.to_string())) @@ -206,9 +198,6 @@ impl WebsocketTransport { } if let Some(id) = inc_event.get("id") { - // just to ensure we don't have outdated entries - response_notifiers.clear_expired_entries(); - let request_id = id.as_u64().unwrap_or_default() as usize; if let Some(notifier) = response_notifiers.remove(&request_id) { @@ -279,7 +268,7 @@ impl WebsocketTransport { loop { futures_util::select! { _ = keepalive_interval.next().fuse() => { - match self.handle_keepalive(&mut wsocket, &mut response_notifiers, expires_at).await { + match self.handle_keepalive(&mut wsocket, expires_at).await { OuterAction::None => {}, OuterAction::Continue => continue, OuterAction::Break => break, diff --git a/mm2src/common/expirable_map.rs b/mm2src/common/expirable_map.rs index e9fe7f0b4f..996e2edfae 100644 --- a/mm2src/common/expirable_map.rs +++ b/mm2src/common/expirable_map.rs @@ -5,7 +5,7 @@ use instant::{Duration, Instant}; use rustc_hash::FxHashMap; -use std::hash::Hash; +use std::{collections::BTreeMap, hash::Hash}; #[derive(Clone, Debug)] pub struct ExpirableEntry { @@ -36,45 +36,83 @@ impl ExpirableEntry { pub fn has_longer_life_than(&self, min_ttl: Duration) -> bool { self.expires_at > Instant::now() + min_ttl } } -impl Default for ExpirableMap { +impl Default for ExpirableMap { fn default() -> Self { Self::new() } } /// A map that allows associating values with keys and expiring entries. -/// It is important to note that this implementation does not automatically -/// remove any entries; it is the caller's responsibility to invoke `clear_expired_entries` -/// at specified intervals. +/// It is important to note that this implementation does not have a background worker to +/// automatically clear expired entries. Outdated entries are only removed when the control flow +/// is handed back to the map mutably (i.e. some mutable method of the map is invoked). /// /// WARNING: This is designed for performance-oriented use-cases utilizing `FxHashMap` /// under the hood and is not suitable for cryptographic purposes. #[derive(Clone, Debug)] -pub struct ExpirableMap(FxHashMap>); +pub struct ExpirableMap { + map: FxHashMap>, + /// A sorted inverse map from expiration times to keys to speed up expired entries clearing. + expiries: BTreeMap, +} -impl ExpirableMap { +impl ExpirableMap { /// Creates a new empty `ExpirableMap` #[inline] - pub fn new() -> Self { Self(FxHashMap::default()) } + pub fn new() -> Self { + Self { + map: FxHashMap::default(), + expiries: BTreeMap::new(), + } + } + + /// Returns the associated value if present and not expired. + #[inline] + pub fn get(&self, k: &K) -> Option<&V> { + self.map + .get(k) + .filter(|v| v.expires_at > Instant::now()) + .map(|v| &v.value) + } - /// Returns the associated value if present. + /// Removes a key-value pair from the map and returns the associated value if present and not expired. #[inline] - pub fn get(&mut self, k: &K) -> Option<&V> { self.0.get(k).map(|v| &v.value) } + pub fn remove(&mut self, k: &K) -> Option { + self.map.remove(k).filter(|v| v.expires_at > Instant::now()).map(|v| { + self.expiries.remove(&v.expires_at); + v.value + }) + } /// Inserts a key-value pair with an expiration duration. /// /// If a value already exists for the given key, it will be updated and then /// the old one will be returned. pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option { + self.clear_expired_entries(); let entry = ExpirableEntry::new(v, exp); - - self.0.insert(k, entry).map(|v| v.value) + self.expiries.insert(entry.expires_at, k); + self.map.insert(k, entry).map(|v| v.value) } /// Removes expired entries from the map. - pub fn clear_expired_entries(&mut self) { self.0.retain(|_k, v| Instant::now() < v.expires_at); } - - /// Removes a key-value pair from the map and returns the associated value if present. - #[inline] - pub fn remove(&mut self, k: &K) -> Option { self.0.remove(k).map(|v| v.value) } + /// + /// Iterates through the `expiries` in order, removing entries that have expired. + /// Stops at the first non-expired entry, leveraging the sorted nature of `BTreeMap`. + fn clear_expired_entries(&mut self) { + let now = Instant::now(); + + // `pop_first()` is used here as it efficiently removes expired entries. + // `first_key_value()` was considered as it wouldn't need re-insertion for + // non-expired entries, but it would require an extra remove operation for + // each expired entry. `pop_first()` needs only one re-insertion per call, + // which is an acceptable trade-off compared to multiple remove operations. + while let Some((exp, key)) = self.expiries.pop_first() { + if exp > now { + self.expiries.insert(exp, key); + break; + } + self.map.remove(&key); + } + } } #[cfg(any(test, target_arch = "wasm32"))] @@ -94,8 +132,8 @@ mod tests { let exp = Duration::from_secs(1); // Insert 2 entries with 1 sec expiration time - expirable_map.insert("key1".to_string(), value.to_string(), exp); - expirable_map.insert("key2".to_string(), value.to_string(), exp); + expirable_map.insert("key1", value, exp); + expirable_map.insert("key2", value, exp); // Wait for entries to expire Timer::sleep(2.).await; @@ -104,14 +142,14 @@ mod tests { expirable_map.clear_expired_entries(); // We waited for 2 seconds, so we shouldn't have any entry accessible - assert_eq!(expirable_map.0.len(), 0); + assert_eq!(expirable_map.map.len(), 0); // Insert 5 entries - expirable_map.insert("key1".to_string(), value.to_string(), Duration::from_secs(5)); - expirable_map.insert("key2".to_string(), value.to_string(), Duration::from_secs(4)); - expirable_map.insert("key3".to_string(), value.to_string(), Duration::from_secs(7)); - expirable_map.insert("key4".to_string(), value.to_string(), Duration::from_secs(2)); - expirable_map.insert("key5".to_string(), value.to_string(), Duration::from_millis(3750)); + expirable_map.insert("key1", value, Duration::from_secs(5)); + expirable_map.insert("key2", value, Duration::from_secs(4)); + expirable_map.insert("key3", value, Duration::from_secs(7)); + expirable_map.insert("key4", value, Duration::from_secs(2)); + expirable_map.insert("key5", value, Duration::from_millis(3750)); // Wait 2 seconds to expire some entries Timer::sleep(2.).await; @@ -120,6 +158,6 @@ mod tests { expirable_map.clear_expired_entries(); // We waited for 2 seconds, only one entry should expire - assert_eq!(expirable_map.0.len(), 4); + assert_eq!(expirable_map.map.len(), 4); }); } diff --git a/mm2src/mm2_core/Cargo.toml b/mm2src/mm2_core/Cargo.toml index 943dcac280..a9d308be9a 100644 --- a/mm2src/mm2_core/Cargo.toml +++ b/mm2src/mm2_core/Cargo.toml @@ -20,6 +20,7 @@ lazy_static = "1.4" mm2_err_handle = { path = "../mm2_err_handle" } mm2_event_stream = { path = "../mm2_event_stream" } mm2_metrics = { path = "../mm2_metrics" } +mm2_libp2p = { path = "../mm2_p2p", package = "mm2_p2p" } primitives = { path = "../mm2_bitcoin/primitives" } rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] } serde = "1" diff --git a/mm2src/mm2_core/src/data_asker.rs b/mm2src/mm2_core/src/data_asker.rs index 5d01310630..7f32f93365 100644 --- a/mm2src/mm2_core/src/data_asker.rs +++ b/mm2src/mm2_core/src/data_asker.rs @@ -113,8 +113,6 @@ pub async fn send_asked_data_rpc( asked_data: SendAskedDataRequest, ) -> Result> { let mut awaiting_asks = ctx.data_asker.awaiting_asks.lock().await; - awaiting_asks.clear_expired_entries(); - match awaiting_asks.remove(&asked_data.data_id) { Some(sender) => { sender.send(asked_data.data).map_to_mm(|_| { diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index 6e060d91c3..3f56970f3c 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -10,6 +10,7 @@ use futures::lock::Mutex as AsyncMutex; use gstuff::{try_s, Constructible, ERR, ERRL}; use lazy_static::lazy_static; use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration}; +use mm2_libp2p::PeerAddress; use mm2_metrics::{MetricsArc, MetricsOps}; use primitives::hash::H160; use rand::Rng; @@ -145,7 +146,7 @@ pub struct MmCtx { #[cfg(not(target_arch = "wasm32"))] pub async_sqlite_connection: Constructible>>, /// Links the RPC context to the P2P context to handle health check responses. - pub healthcheck_response_handler: AsyncMutex>>, + pub healthcheck_response_handler: AsyncMutex>>, } impl MmCtx { diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index 53154efa05..07091e2631 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -65,7 +65,7 @@ mm2_err_handle = { path = "../mm2_err_handle" } mm2_event_stream = { path = "../mm2_event_stream" } mm2_gui_storage = { path = "../mm2_gui_storage" } mm2_io = { path = "../mm2_io" } -mm2-libp2p = { path = "../mm2_p2p", package = "mm2_p2p" } +mm2_libp2p = { path = "../mm2_p2p", package = "mm2_p2p" } mm2_metrics = { path = "../mm2_metrics" } mm2_net = { path = "../mm2_net", features = ["event-stream", "p2p"] } mm2_number = { path = "../mm2_number" } diff --git a/mm2src/mm2_main/src/lp_healthcheck.rs b/mm2src/mm2_main/src/lp_healthcheck.rs index 722bc21402..849f478d5f 100644 --- a/mm2src/mm2_main/src/lp_healthcheck.rs +++ b/mm2src/mm2_main/src/lp_healthcheck.rs @@ -9,12 +9,11 @@ use instant::{Duration, Instant}; use lazy_static::lazy_static; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::MmError; -use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, TopicPrefix}; +use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix}; use mm2_net::p2p::P2PContext; use ser_error_derive::SerializeErrorType; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; -use std::str::FromStr; use std::sync::Mutex; use crate::lp_network::broadcast_p2p_msg; @@ -37,71 +36,6 @@ pub(crate) struct HealthcheckMessage { data: HealthcheckData, } -/// Wrapper of `libp2p::PeerId` with trait additional implementations. -/// -/// TODO: This should be used as a replacement of `libp2p::PeerId` in the entire project. -#[derive(Clone, Copy, Debug, Display, PartialEq)] -pub struct PeerAddress(mm2_libp2p::PeerId); - -impl From for PeerAddress { - fn from(value: mm2_libp2p::PeerId) -> Self { Self(value) } -} - -impl From for mm2_libp2p::PeerId { - fn from(value: PeerAddress) -> Self { value.0 } -} - -impl Serialize for PeerAddress { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_str(&self.0.to_string()) - } -} - -impl<'de> Deserialize<'de> for PeerAddress { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct PeerAddressVisitor; - - impl<'de> serde::de::Visitor<'de> for PeerAddressVisitor { - type Value = PeerAddress; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a string representation of peer id.") - } - - fn visit_str(self, value: &str) -> Result - where - E: serde::de::Error, - { - if value.len() > 100 { - return Err(serde::de::Error::invalid_length( - value.len(), - &"peer id cannot exceed 100 characters.", - )); - } - - Ok(mm2_libp2p::PeerId::from_str(value) - .map_err(serde::de::Error::custom)? - .into()) - } - - fn visit_string(self, value: String) -> Result - where - E: serde::de::Error, - { - self.visit_str(&value) - } - } - - deserializer.deserialize_str(PeerAddressVisitor) - } -} - #[derive(Debug, Display)] enum SignValidationError { #[display( @@ -331,8 +265,7 @@ pub async fn peer_connection_healthcheck_rpc( { let mut book = ctx.healthcheck_response_handler.lock().await; - book.clear_expired_entries(); - book.insert(target_peer_address.to_string(), tx, address_record_exp); + book.insert(target_peer_address, tx, address_record_exp); } broadcast_p2p_msg( @@ -395,7 +328,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li } else { // The requested peer is healthy; signal the response channel. let mut response_handler = ctx.healthcheck_response_handler.lock().await; - if let Some(tx) = response_handler.remove(&sender_peer.to_string()) { + if let Some(tx) = response_handler.remove(&sender_peer) { if tx.send(()).is_err() { log::error!("Result channel isn't present for peer '{sender_peer}'."); }; @@ -409,6 +342,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li #[cfg(any(test, target_arch = "wasm32"))] mod tests { use std::mem::discriminant; + use std::str::FromStr; use super::*; use common::cross_test; diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 9829fed75a..dc9b85082b 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -122,6 +122,7 @@ const TAKER_ORDER_TIMEOUT: u64 = 30; const ORDER_MATCH_TIMEOUT: u64 = 30; const ORDERBOOK_REQUESTING_TIMEOUT: u64 = MIN_ORDER_KEEP_ALIVE_INTERVAL * 2; const MAX_ORDERS_NUMBER_IN_ORDERBOOK_RESPONSE: usize = 1000; +const RECENTLY_CANCELLED_TIMEOUT: Duration = Duration::from_secs(120); #[cfg(not(test))] const TRIE_STATE_HISTORY_TIMEOUT: u64 = 14400; #[cfg(test)] @@ -331,6 +332,12 @@ async fn process_orders_keep_alive( Ok(()) } +#[inline] +fn process_maker_order_created(ctx: &MmArc, from_pubkey: String, created_msg: new_protocol::MakerOrderCreated) { + let order: OrderbookItem = (created_msg, from_pubkey).into(); + insert_or_update_order(ctx, order); +} + fn process_maker_order_updated( ctx: MmArc, from_pubkey: String, @@ -350,6 +357,22 @@ fn process_maker_order_updated( Ok(()) } +fn process_maker_order_cancelled(ctx: &MmArc, from_pubkey: String, cancelled_msg: new_protocol::MakerOrderCancelled) { + let uuid = Uuid::from(cancelled_msg.uuid); + let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); + let mut orderbook = ordermatch_ctx.orderbook.lock(); + // Add the order to the recently cancelled list to ignore it if a new order with the same uuid + // is received within the `RECENTLY_CANCELLED_TIMEOUT` timeframe. + // We do this even if the order is in the order_set, because it could have been added through + // means other than the order creation message. + orderbook.recently_cancelled.insert(uuid, from_pubkey.clone()); + if let Some(order) = orderbook.order_set.get(&uuid) { + if order.pubkey == from_pubkey { + orderbook.remove_order_trie_update(uuid); + } + } +} + // fn verify_pubkey_orderbook(orderbook: &GetOrderbookPubkeyItem) -> Result<(), String> { // let keys: Vec<(_, _)> = orderbook // .orders @@ -456,16 +479,6 @@ fn insert_or_update_my_order(ctx: &MmArc, item: OrderbookItem, my_order: &MakerO } } -fn delete_order(ctx: &MmArc, pubkey: &str, uuid: Uuid) { - let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut orderbook = ordermatch_ctx.orderbook.lock(); - if let Some(order) = orderbook.order_set.get(&uuid) { - if order.pubkey == pubkey { - orderbook.remove_order_trie_update(uuid); - } - } -} - fn delete_my_order(ctx: &MmArc, uuid: Uuid, p2p_privkey: Option) { let ordermatch_ctx: Arc = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); let mut orderbook = ordermatch_ctx.orderbook.lock(); @@ -548,8 +561,7 @@ pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay: log::debug!("received ordermatch message {:?}", message); match message { new_protocol::OrdermatchMessage::MakerOrderCreated(created_msg) => { - let order: OrderbookItem = (created_msg, hex::encode(pubkey.to_bytes().as_slice())).into(); - insert_or_update_order(&ctx, order); + process_maker_order_created(&ctx, pubkey.to_hex(), created_msg); Ok(()) }, new_protocol::OrdermatchMessage::PubkeyKeepAlive(keep_alive) => { @@ -576,7 +588,7 @@ pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay: Ok(()) }, new_protocol::OrdermatchMessage::MakerOrderCancelled(cancelled_msg) => { - delete_order(&ctx, &pubkey.to_hex(), cancelled_msg.uuid.into()); + process_maker_order_cancelled(&ctx, pubkey.to_hex(), cancelled_msg); Ok(()) }, new_protocol::OrdermatchMessage::MakerOrderUpdated(updated_msg) => { @@ -1083,15 +1095,15 @@ fn maker_order_updated_p2p_notify( broadcast_p2p_msg(&ctx, topic, encoded_msg, peer_id); } -fn maker_order_cancelled_p2p_notify(ctx: MmArc, order: &MakerOrder) { +fn maker_order_cancelled_p2p_notify(ctx: &MmArc, order: &MakerOrder) { let message = new_protocol::OrdermatchMessage::MakerOrderCancelled(new_protocol::MakerOrderCancelled { uuid: order.uuid.into(), timestamp: now_sec(), pair_trie_root: H64::default(), }); - delete_my_order(&ctx, order.uuid, order.p2p_privkey); + delete_my_order(ctx, order.uuid, order.p2p_privkey); log::debug!("maker_order_cancelled_p2p_notify called, message {:?}", message); - broadcast_ordermatch_message(&ctx, order.orderbook_topic(), message, order.p2p_keypair()); + broadcast_ordermatch_message(ctx, order.orderbook_topic(), message, order.p2p_keypair()); } pub struct BalanceUpdateOrdermatchHandler { @@ -1141,7 +1153,7 @@ impl BalanceTradeFeeUpdatedHandler for BalanceUpdateOrdermatchHandler { // This checks that the order hasn't been removed by another process if removed_order_mutex.is_some() { // cancel the order - maker_order_cancelled_p2p_notify(ctx.clone(), &order); + maker_order_cancelled_p2p_notify(&ctx, &order); delete_my_maker_order( ctx.clone(), order.clone(), @@ -2477,7 +2489,6 @@ fn collect_orderbook_metrics(ctx: &MmArc, orderbook: &Orderbook) { mm_gauge!(ctx.metrics, "orderbook.memory_db", memory_db_size as f64); } -#[derive(Default)] struct Orderbook { /// A map from (base, rel). ordered: HashMap<(String, String), BTreeSet>, @@ -2490,12 +2501,34 @@ struct Orderbook { order_set: HashMap, /// a map of orderbook states of known maker pubkeys pubkeys_state: HashMap, + /// The `TimeCache` of recently canceled orders, mapping `Uuid` to the maker pubkey as `String`, + /// used to avoid order recreation in case of out-of-order p2p messages, + /// e.g., when receiving the order cancellation message before the order is created. + /// Entries are kept for `RECENTLY_CANCELLED_TIMEOUT` seconds. + recently_cancelled: TimeCache, topics_subscribed_to: HashMap, /// MemoryDB instance to store Patricia Tries data memory_db: MemoryDB, my_p2p_pubkeys: HashSet, } +impl Default for Orderbook { + fn default() -> Self { + Orderbook { + ordered: HashMap::default(), + pairs_existing_for_base: HashMap::default(), + pairs_existing_for_rel: HashMap::default(), + unordered: HashMap::default(), + order_set: HashMap::default(), + pubkeys_state: HashMap::default(), + recently_cancelled: TimeCache::new(RECENTLY_CANCELLED_TIMEOUT), + topics_subscribed_to: HashMap::default(), + memory_db: MemoryDB::default(), + my_p2p_pubkeys: HashSet::default(), + } + } +} + fn hashed_null_node() -> TrieHash { ::hashed_null_node() } impl Orderbook { @@ -2512,6 +2545,12 @@ impl Orderbook { fn find_order_by_uuid(&self, uuid: &Uuid) -> Option { self.order_set.get(uuid).cloned() } fn insert_or_update_order_update_trie(&mut self, order: OrderbookItem) { + // Ignore the order if it was recently cancelled + if self.recently_cancelled.get(&order.uuid) == Some(&order.pubkey) { + warn!("Maker order {} was recently cancelled, ignoring", order.uuid); + return; + } + let zero = BigRational::from_integer(0.into()); if order.max_volume <= zero || order.price <= zero || order.min_volume < zero { self.remove_order_trie_update(order.uuid); @@ -3314,6 +3353,7 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { // This checks that the order hasn't been removed by another process if let Some(order_mutex) = removed_order_mutex { let order = order_mutex.lock().await; + maker_order_cancelled_p2p_notify(&ctx, &order); delete_my_maker_order( ctx.clone(), order.clone(), @@ -3430,7 +3470,7 @@ async fn check_balance_for_maker_orders(ctx: MmArc, ordermatch_ctx: &OrdermatchC let removed_order_mutex = ordermatch_ctx.maker_orders_ctx.lock().remove_order(&uuid); // This checks that the order hasn't been removed by another process if removed_order_mutex.is_some() { - maker_order_cancelled_p2p_notify(ctx.clone(), &order); + maker_order_cancelled_p2p_notify(&ctx, &order); delete_my_maker_order(ctx.clone(), order.clone(), reason) .compat() .await @@ -4739,7 +4779,7 @@ async fn cancel_previous_maker_orders( let removed_order_mutex = ordermatch_ctx.maker_orders_ctx.lock().remove_order(&uuid); // This checks that the uuid, &order.base hasn't been removed by another process if removed_order_mutex.is_some() { - maker_order_cancelled_p2p_notify(ctx.clone(), &order); + maker_order_cancelled_p2p_notify(ctx, &order); delete_my_maker_order(ctx.clone(), order.clone(), MakerOrderCancellationReason::Cancelled) .compat() .await @@ -5141,7 +5181,7 @@ pub async fn cancel_order(ctx: MmArc, req: CancelOrderReq) -> Result Result> let removed_order_mutex = ordermatch_ctx.maker_orders_ctx.lock().remove_order(&order.uuid); // This checks that the order hasn't been removed by another process if removed_order_mutex.is_some() { - maker_order_cancelled_p2p_notify(ctx.clone(), &order); + maker_order_cancelled_p2p_notify(&ctx, &order); delete_my_maker_order(ctx, order.clone(), MakerOrderCancellationReason::Cancelled) .compat() .await @@ -5539,7 +5579,7 @@ pub async fn cancel_orders_by(ctx: &MmArc, cancel_by: CancelBy) -> Result<(Vec = Secp256k1::signing_only(); } +/// Wrapper of `libp2p::PeerId` with trait additional implementations. +/// +/// TODO: This should be used as a replacement of `libp2p::PeerId` in the entire project. +#[derive(Clone, Copy, Debug, Display, Eq, Hash, PartialEq)] +pub struct PeerAddress(PeerId); + +impl From for PeerAddress { + fn from(value: PeerId) -> Self { Self(value) } +} + +impl From for PeerId { + fn from(value: PeerAddress) -> Self { value.0 } +} + +impl Serialize for PeerAddress { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.0.to_string()) + } +} + +impl<'de> Deserialize<'de> for PeerAddress { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct PeerAddressVisitor; + + impl<'de> serde::de::Visitor<'de> for PeerAddressVisitor { + type Value = PeerAddress; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a string representation of peer id.") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + if value.len() > 100 { + return Err(serde::de::Error::invalid_length( + value.len(), + &"peer id cannot exceed 100 characters.", + )); + } + + Ok(PeerId::from_str(value).map_err(de::Error::custom)?.into()) + } + + fn visit_string(self, value: String) -> Result + where + E: de::Error, + { + self.visit_str(&value) + } + } + + deserializer.deserialize_str(PeerAddressVisitor) + } +} + #[derive(Clone, Copy, Debug)] pub enum NetworkInfo { /// The in-memory network.