Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(orders): fix cancel order race condition using time-based cache #2232

Merged
merged 26 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6ad60e9
add recently cancelled orders time cache to orderbook
shamardy Sep 28, 2024
232019b
revert test_cancel_order changes
shamardy Sep 28, 2024
1c38da7
review fix: cherry-pick 62d26c7
mariocynicys Sep 4, 2024
da884fc
optimize `clear_expired_entries`
shamardy Sep 30, 2024
5762d13
provide a consistent interface for `ExpirableMap`
shamardy Sep 30, 2024
f2ef6d3
use Copy type parameter instead of Clone in `ExpirableMap`
shamardy Sep 30, 2024
87c53f8
make test_cancel_all_orders not rely on the order of messages
shamardy Oct 1, 2024
5591f89
Make `RECENTLY_CANCELLED_TIMEOUT` `Duration` Type
shamardy Oct 1, 2024
69fdfe9
use ctx as reference in `maker_order_cancelled_p2p_notify`
shamardy Oct 1, 2024
14f7f34
move `recently_cancelled` time cache to it's own mutex in `Ordermatch…
shamardy Oct 1, 2024
e0e5d41
Revert "move `recently_cancelled` time cache to it's own mutex in `Or…
shamardy Oct 2, 2024
cdfd809
move recently cancelled back to orderbook struct and handle it in a b…
shamardy Oct 2, 2024
860544f
review fix: clarify some comments
shamardy Oct 2, 2024
20fa7e5
add order to `recently_cancelled` even if it was in the `order_set`
shamardy Oct 2, 2024
d2675d2
review fix: add implementation details to `clear_expired_entries`
shamardy Oct 2, 2024
abfb252
review fix: check expiry on remove as well
shamardy Oct 2, 2024
5a839be
review fix: move implementation details to a comment inside the `clea…
shamardy Oct 2, 2024
598045d
revert `test_cancel_all_orders` to dev state and add `test_order_canc…
shamardy Oct 3, 2024
9faa804
fix clippy
shamardy Oct 3, 2024
158252c
review fix: remove return types from some functions that don't need them
shamardy Oct 3, 2024
ff54b83
review fix: fix comments in `test_order_cancellation_received_before_…
shamardy Oct 3, 2024
3f0a6bd
Merge remote-tracking branch 'origin/dev' into fix-p2p-cancellation
shamardy Oct 3, 2024
b1cae0f
reorder imports after merge
shamardy Oct 3, 2024
9dd4544
remove adex-cli CI workflow
shamardy Oct 4, 2024
d55efc3
update `mm2_libp2p` dependency name for consistency
shamardy Oct 4, 2024
1e6bbb8
Merge remote-tracking branch 'origin/dev' into fix-p2p-cancellation
shamardy Oct 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions mm2src/common/expirable_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ impl<K: Eq + Hash + Copy, V> ExpirableMap<K, V> {
/// the old one will be returned.
pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option<V> {
self.clear_expired_entries();
shamardy marked this conversation as resolved.
Show resolved Hide resolved
let expires_at = Instant::now() + exp;
let entry = ExpirableEntry { expires_at, value: v };
self.expiries.insert(expires_at, k);
let entry = ExpirableEntry::new(v, exp);
self.expiries.insert(entry.expires_at, k);
self.map.insert(k, entry).map(|v| v.value)
}

Expand Down
1 change: 1 addition & 0 deletions mm2src/mm2_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ lazy_static = "1.4"
mm2_err_handle = { path = "../mm2_err_handle" }
mm2_event_stream = { path = "../mm2_event_stream" }
mm2_metrics = { path = "../mm2_metrics" }
mm2-libp2p = { path = "../mm2_p2p", package = "mm2_p2p" }
shamardy marked this conversation as resolved.
Show resolved Hide resolved
primitives = { path = "../mm2_bitcoin/primitives" }
rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] }
serde = "1"
Expand Down
6 changes: 3 additions & 3 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::data_asker::DataAsker;
#[cfg(feature = "track-ctx-pointer")]
use common::executor::Timer;
use common::log::{self, LogLevel, LogOnError, LogState};
Expand All @@ -10,6 +11,7 @@ use futures::lock::Mutex as AsyncMutex;
use gstuff::{try_s, Constructible, ERR, ERRL};
use lazy_static::lazy_static;
use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration};
use mm2_libp2p::PeerAddress;
use mm2_metrics::{MetricsArc, MetricsOps};
use primitives::hash::H160;
use rand::Rng;
Expand All @@ -23,8 +25,6 @@ use std::future::Future;
use std::ops::Deref;
use std::sync::{Arc, Mutex};

use crate::data_asker::DataAsker;

cfg_wasm32! {
use mm2_rpc::wasm_rpc::WasmRpcSender;
use crate::DbNamespaceId;
Expand Down Expand Up @@ -145,7 +145,7 @@ pub struct MmCtx {
#[cfg(not(target_arch = "wasm32"))]
pub async_sqlite_connection: Constructible<Arc<AsyncMutex<AsyncConnection>>>,
/// Links the RPC context to the P2P context to handle health check responses.
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<PeerAddress, oneshot::Sender<()>>>,
}

impl MmCtx {
Expand Down
77 changes: 5 additions & 72 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use instant::{Duration, Instant};
use lazy_static::lazy_static;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmError;
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, TopicPrefix};
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix};
use mm2_net::p2p::P2PContext;
use ser_error_derive::SerializeErrorType;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::str::FromStr;
use std::sync::Mutex;

use crate::lp_network::broadcast_p2p_msg;
Expand All @@ -37,71 +36,6 @@ pub(crate) struct HealthcheckMessage {
data: HealthcheckData,
}

/// Wrapper of `libp2p::PeerId` with trait additional implementations.
///
/// TODO: This should be used as a replacement of `libp2p::PeerId` in the entire project.
#[derive(Clone, Copy, Debug, Display, PartialEq)]
pub struct PeerAddress(mm2_libp2p::PeerId);

impl From<mm2_libp2p::PeerId> for PeerAddress {
fn from(value: mm2_libp2p::PeerId) -> Self { Self(value) }
}

impl From<PeerAddress> for mm2_libp2p::PeerId {
fn from(value: PeerAddress) -> Self { value.0 }
}

impl Serialize for PeerAddress {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.0.to_string())
}
}

impl<'de> Deserialize<'de> for PeerAddress {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct PeerAddressVisitor;

impl<'de> serde::de::Visitor<'de> for PeerAddressVisitor {
type Value = PeerAddress;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string representation of peer id.")
}

fn visit_str<E>(self, value: &str) -> Result<PeerAddress, E>
where
E: serde::de::Error,
{
if value.len() > 100 {
return Err(serde::de::Error::invalid_length(
value.len(),
&"peer id cannot exceed 100 characters.",
));
}

Ok(mm2_libp2p::PeerId::from_str(value)
.map_err(serde::de::Error::custom)?
.into())
}

fn visit_string<E>(self, value: String) -> Result<PeerAddress, E>
where
E: serde::de::Error,
{
self.visit_str(&value)
}
}

deserializer.deserialize_str(PeerAddressVisitor)
}
}

#[derive(Debug, Display)]
enum SignValidationError {
#[display(
Expand Down Expand Up @@ -331,8 +265,7 @@ pub async fn peer_connection_healthcheck_rpc(

{
let mut book = ctx.healthcheck_response_handler.lock().await;
book.clear_expired_entries();
book.insert(target_peer_address.to_string(), tx, address_record_exp);
book.insert(target_peer_address, tx, address_record_exp);
}

broadcast_p2p_msg(
Expand Down Expand Up @@ -395,7 +328,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
} else {
// The requested peer is healthy; signal the response channel.
let mut response_handler = ctx.healthcheck_response_handler.lock().await;
if let Some(tx) = response_handler.remove(&sender_peer.to_string()) {
if let Some(tx) = response_handler.remove(&sender_peer) {
if tx.send(()).is_err() {
log::error!("Result channel isn't present for peer '{sender_peer}'.");
};
Expand All @@ -408,13 +341,13 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li

#[cfg(any(test, target_arch = "wasm32"))]
mod tests {
use std::mem::discriminant;

use super::*;
use common::cross_test;
use crypto::CryptoCtx;
use mm2_libp2p::behaviours::atomicdex::generate_ed25519_keypair;
use mm2_test_helpers::for_tests::mm_ctx_with_iguana;
use std::mem::discriminant;
use std::str::FromStr;

common::cfg_wasm32! {
use wasm_bindgen_test::*;
Expand Down
65 changes: 65 additions & 0 deletions mm2src/mm2_p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ mod network;
mod relay_address;
mod swarm_runtime;

use derive_more::Display;
use lazy_static::lazy_static;
use secp256k1::{Message as SecpMessage, PublicKey as Secp256k1Pubkey, Secp256k1, SecretKey, SignOnly, Signature,
VerifyOnly};
use serde::{de, Deserialize, Serialize, Serializer};
use sha2::digest::Update;
use sha2::{Digest, Sha256};
use std::str::FromStr;

pub use crate::swarm_runtime::SwarmRuntime;

Expand Down Expand Up @@ -43,6 +45,69 @@ lazy_static! {
static ref SECP_SIGN: Secp256k1<SignOnly> = Secp256k1::signing_only();
}

/// Wrapper of `libp2p::PeerId` with trait additional implementations.
///
/// TODO: This should be used as a replacement of `libp2p::PeerId` in the entire project.
#[derive(Clone, Copy, Debug, Display, Eq, Hash, PartialEq)]
pub struct PeerAddress(PeerId);

impl From<PeerId> for PeerAddress {
fn from(value: PeerId) -> Self { Self(value) }
}

impl From<PeerAddress> for PeerId {
fn from(value: PeerAddress) -> Self { value.0 }
}

impl Serialize for PeerAddress {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.0.to_string())
}
}

impl<'de> Deserialize<'de> for PeerAddress {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct PeerAddressVisitor;

impl<'de> serde::de::Visitor<'de> for PeerAddressVisitor {
type Value = PeerAddress;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string representation of peer id.")
}

fn visit_str<E>(self, value: &str) -> Result<PeerAddress, E>
where
E: serde::de::Error,
{
if value.len() > 100 {
return Err(serde::de::Error::invalid_length(
value.len(),
&"peer id cannot exceed 100 characters.",
));
}

Ok(PeerId::from_str(value).map_err(de::Error::custom)?.into())
}

fn visit_string<E>(self, value: String) -> Result<PeerAddress, E>
where
E: de::Error,
{
self.visit_str(&value)
}
}

deserializer.deserialize_str(PeerAddressVisitor)
}
}

#[derive(Clone, Copy, Debug)]
pub enum NetworkInfo {
/// The in-memory network.
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.