diff --git a/Cargo.lock b/Cargo.lock index 431959e5c5..08a985285b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4404,6 +4404,7 @@ dependencies = [ "mm2_err_handle", "mm2_event_stream", "mm2_metrics", + "mm2_p2p", "mm2_rpc", "primitives", "rand 0.7.3", diff --git a/mm2src/common/expirable_map.rs b/mm2src/common/expirable_map.rs index 4c2e6c80bd..996e2edfae 100644 --- a/mm2src/common/expirable_map.rs +++ b/mm2src/common/expirable_map.rs @@ -14,9 +14,26 @@ pub struct ExpirableEntry { } impl ExpirableEntry { + #[inline(always)] + pub fn new(v: V, exp: Duration) -> Self { + Self { + expires_at: Instant::now() + exp, + value: v, + } + } + + #[inline(always)] pub fn get_element(&self) -> &V { &self.value } + #[inline(always)] + pub fn update_value(&mut self, v: V) { self.value = v } + + #[inline(always)] pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at } + + /// Checks whether entry has longer ttl than the given one. + #[inline(always)] + pub fn has_longer_life_than(&self, min_ttl: Duration) -> bool { self.expires_at > Instant::now() + min_ttl } } impl Default for ExpirableMap { @@ -71,9 +88,8 @@ impl ExpirableMap { /// the old one will be returned. pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option { self.clear_expired_entries(); - let expires_at = Instant::now() + exp; - let entry = ExpirableEntry { expires_at, value: v }; - self.expiries.insert(expires_at, k); + let entry = ExpirableEntry::new(v, exp); + self.expiries.insert(entry.expires_at, k); self.map.insert(k, entry).map(|v| v.value) } diff --git a/mm2src/mm2_core/Cargo.toml b/mm2src/mm2_core/Cargo.toml index 943dcac280..b22c2df7f1 100644 --- a/mm2src/mm2_core/Cargo.toml +++ b/mm2src/mm2_core/Cargo.toml @@ -20,6 +20,7 @@ lazy_static = "1.4" mm2_err_handle = { path = "../mm2_err_handle" } mm2_event_stream = { path = "../mm2_event_stream" } mm2_metrics = { path = "../mm2_metrics" } +mm2-libp2p = { path = "../mm2_p2p", package = "mm2_p2p" } primitives = { path = "../mm2_bitcoin/primitives" } rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] } serde = "1" diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index c61b4e1aba..63f4e65981 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -1,12 +1,17 @@ +use crate::data_asker::DataAsker; #[cfg(feature = "track-ctx-pointer")] use common::executor::Timer; -use common::executor::{abortable_queue::{AbortableQueue, WeakSpawner}, - graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture}; use common::log::{self, LogLevel, LogOnError, LogState}; use common::{cfg_native, cfg_wasm32, small_rng}; +use common::{executor::{abortable_queue::{AbortableQueue, WeakSpawner}, + graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture}, + expirable_map::ExpirableMap}; +use futures::channel::oneshot; +use futures::lock::Mutex as AsyncMutex; use gstuff::{try_s, Constructible, ERR, ERRL}; use lazy_static::lazy_static; use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration}; +use mm2_libp2p::PeerAddress; use mm2_metrics::{MetricsArc, MetricsOps}; use primitives::hash::H160; use rand::Rng; @@ -20,8 +25,6 @@ use std::future::Future; use std::ops::Deref; use std::sync::{Arc, Mutex}; -use crate::data_asker::DataAsker; - cfg_wasm32! { use mm2_rpc::wasm_rpc::WasmRpcSender; use crate::DbNamespaceId; @@ -30,7 +33,6 @@ cfg_wasm32! { cfg_native! { use db_common::async_sql_conn::AsyncConnection; use db_common::sqlite::rusqlite::Connection; - use futures::lock::Mutex as AsyncMutex; use rustls::ServerName; use mm2_metrics::prometheus; use mm2_metrics::MmMetricsError; @@ -142,6 +144,8 @@ pub struct MmCtx { /// asynchronous handle for rusqlite connection. #[cfg(not(target_arch = "wasm32"))] pub async_sqlite_connection: Constructible>>, + /// Links the RPC context to the P2P context to handle health check responses. + pub healthcheck_response_handler: AsyncMutex>>, } impl MmCtx { @@ -191,6 +195,7 @@ impl MmCtx { nft_ctx: Mutex::new(None), #[cfg(not(target_arch = "wasm32"))] async_sqlite_connection: Constructible::default(), + healthcheck_response_handler: AsyncMutex::new(ExpirableMap::default()), } } diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index 8f84ebb90a..53154efa05 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -32,6 +32,7 @@ bitcrypto = { path = "../mm2_bitcoin/crypto" } blake2 = "0.10.6" bytes = "0.4" chain = { path = "../mm2_bitcoin/chain" } +chrono = "0.4" cfg-if = "1.0" coins = { path = "../coins" } coins_activation = { path = "../coins_activation" } diff --git a/mm2src/mm2_main/src/lp_healthcheck.rs b/mm2src/mm2_main/src/lp_healthcheck.rs new file mode 100644 index 0000000000..dae65a8fed --- /dev/null +++ b/mm2src/mm2_main/src/lp_healthcheck.rs @@ -0,0 +1,446 @@ +use async_std::prelude::FutureExt; +use chrono::Utc; +use common::executor::SpawnFuture; +use common::expirable_map::ExpirableEntry; +use common::{log, HttpStatusCode, StatusCode}; +use derive_more::Display; +use futures::channel::oneshot::{self, Receiver, Sender}; +use instant::{Duration, Instant}; +use lazy_static::lazy_static; +use mm2_core::mm_ctx::MmArc; +use mm2_err_handle::prelude::MmError; +use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix}; +use mm2_net::p2p::P2PContext; +use ser_error_derive::SerializeErrorType; +use serde::{Deserialize, Serialize}; +use std::convert::TryFrom; +use std::sync::Mutex; + +use crate::lp_network::broadcast_p2p_msg; + +pub(crate) const PEER_HEALTHCHECK_PREFIX: TopicPrefix = "hcheck"; + +const fn healthcheck_message_exp_secs() -> u64 { + #[cfg(test)] + return 3; + + #[cfg(not(test))] + 10 +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[cfg_attr(any(test, target_arch = "wasm32"), derive(PartialEq))] +pub(crate) struct HealthcheckMessage { + #[serde(deserialize_with = "deserialize_bytes")] + signature: Vec, + data: HealthcheckData, +} + +#[derive(Debug, Display)] +enum SignValidationError { + #[display( + fmt = "Healthcheck message is expired. Current time in UTC: {now_secs}, healthcheck `expires_at` in UTC: {expires_at_secs}" + )] + Expired { now_secs: u64, expires_at_secs: u64 }, + #[display( + fmt = "Healthcheck message have too high expiration time. Max allowed expiration seconds: {max_allowed_expiration_secs}, received message expiration seconds: {remaining_expiration_secs}" + )] + LifetimeOverflow { + max_allowed_expiration_secs: u64, + remaining_expiration_secs: u64, + }, + #[display(fmt = "Public key is not valid.")] + InvalidPublicKey, + #[display(fmt = "Signature integrity doesn't match with the public key.")] + FakeSignature, + #[display(fmt = "Process failed unexpectedly due to this reason: {reason}")] + Internal { reason: String }, +} + +impl HealthcheckMessage { + pub(crate) fn generate_message(ctx: &MmArc, is_a_reply: bool) -> Result { + let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx); + let keypair = p2p_ctx.keypair(); + let sender_public_key = keypair.public().encode_protobuf(); + + let data = HealthcheckData { + sender_public_key, + expires_at_secs: u64::try_from(Utc::now().timestamp()).map_err(|e| e.to_string())? + + healthcheck_message_exp_secs(), + is_a_reply, + }; + + let signature = try_s!(keypair.sign(&try_s!(data.encode()))); + + Ok(Self { signature, data }) + } + + fn generate_or_use_cached_message(ctx: &MmArc) -> Result { + const MIN_DURATION_FOR_REUSABLE_MSG: Duration = Duration::from_secs(5); + + lazy_static! { + static ref RECENTLY_GENERATED_MESSAGE: Mutex> = + Mutex::new(ExpirableEntry::new( + // Using dummy values in order to initialize `HealthcheckMessage` context. + HealthcheckMessage { + signature: vec![], + data: HealthcheckData { + sender_public_key: vec![], + expires_at_secs: 0, + is_a_reply: false, + }, + }, + Duration::from_secs(0) + )); + } + + // If recently generated message has longer life than `MIN_DURATION_FOR_REUSABLE_MSG`, we can reuse it to + // reduce the message generation overhead under high pressure. + let mut mutexed_msg = RECENTLY_GENERATED_MESSAGE.lock().unwrap(); + + if mutexed_msg.has_longer_life_than(MIN_DURATION_FOR_REUSABLE_MSG) { + Ok(mutexed_msg.get_element().clone()) + } else { + let new_msg = HealthcheckMessage::generate_message(ctx, true)?; + + mutexed_msg.update_value(new_msg.clone()); + mutexed_msg.update_expiration(Instant::now() + Duration::from_secs(healthcheck_message_exp_secs())); + + Ok(new_msg) + } + } + + fn is_received_message_valid(&self) -> Result { + let now_secs = u64::try_from(Utc::now().timestamp()) + .map_err(|e| SignValidationError::Internal { reason: e.to_string() })?; + + let remaining_expiration_secs = self.data.expires_at_secs - now_secs; + + if remaining_expiration_secs == 0 { + return Err(SignValidationError::Expired { + now_secs, + expires_at_secs: self.data.expires_at_secs, + }); + } else if remaining_expiration_secs > healthcheck_message_exp_secs() { + return Err(SignValidationError::LifetimeOverflow { + max_allowed_expiration_secs: healthcheck_message_exp_secs(), + remaining_expiration_secs, + }); + } + + let Ok(public_key) = Libp2pPublic::try_decode_protobuf(&self.data.sender_public_key) else { + log::debug!("Couldn't decode public key from the healthcheck message."); + + return Err(SignValidationError::InvalidPublicKey); + }; + + let encoded_message = self + .data + .encode() + .map_err(|e| SignValidationError::Internal { reason: e.to_string() })?; + + if public_key.verify(&encoded_message, &self.signature) { + Ok(public_key.to_peer_id().into()) + } else { + Err(SignValidationError::FakeSignature) + } + } + + #[inline] + pub(crate) fn encode(&self) -> Result, rmp_serde::encode::Error> { encode_message(self) } + + #[inline] + pub(crate) fn decode(bytes: &[u8]) -> Result { decode_message(bytes) } + + #[inline] + pub(crate) fn should_reply(&self) -> bool { !self.data.is_a_reply } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[cfg_attr(any(test, target_arch = "wasm32"), derive(PartialEq))] +struct HealthcheckData { + #[serde(deserialize_with = "deserialize_bytes")] + sender_public_key: Vec, + expires_at_secs: u64, + is_a_reply: bool, +} + +impl HealthcheckData { + #[inline] + fn encode(&self) -> Result, rmp_serde::encode::Error> { encode_message(self) } +} + +#[inline] +pub fn peer_healthcheck_topic(peer_address: &PeerAddress) -> String { + pub_sub_topic(PEER_HEALTHCHECK_PREFIX, &peer_address.to_string()) +} + +#[derive(Deserialize)] +pub struct RequestPayload { + peer_address: PeerAddress, +} + +fn deserialize_bytes<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + struct ByteVisitor; + + impl<'de> serde::de::Visitor<'de> for ByteVisitor { + type Value = Vec; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a non-empty byte array up to 512 bytes") + } + + fn visit_seq(self, mut seq: A) -> Result, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut buffer = vec![]; + while let Some(byte) = seq.next_element()? { + if buffer.len() >= 512 { + return Err(serde::de::Error::invalid_length( + buffer.len(), + &"longest possible length allowed for this field is 512 bytes (with RSA algorithm).", + )); + } + + buffer.push(byte); + } + + if buffer.is_empty() { + return Err(serde::de::Error::custom("Can't be empty.")); + } + + Ok(buffer) + } + } + + deserializer.deserialize_seq(ByteVisitor) +} + +#[derive(Debug, Display, Serialize, SerializeErrorType)] +#[serde(tag = "error_type", content = "error_data")] +pub enum HealthcheckRpcError { + MessageGenerationFailed { reason: String }, + MessageEncodingFailed { reason: String }, + Internal { reason: String }, +} + +impl HttpStatusCode for HealthcheckRpcError { + fn status_code(&self) -> common::StatusCode { + match self { + HealthcheckRpcError::MessageGenerationFailed { .. } + | HealthcheckRpcError::Internal { .. } + | HealthcheckRpcError::MessageEncodingFailed { .. } => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + +pub async fn peer_connection_healthcheck_rpc( + ctx: MmArc, + req: RequestPayload, +) -> Result> { + // When things go awry, we want records to clear themselves to keep the memory clean of unused data. + // This is unrelated to the timeout logic. + let address_record_exp = Duration::from_secs(healthcheck_message_exp_secs()); + + let target_peer_address = req.peer_address; + + let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); + if target_peer_address == p2p_ctx.peer_id().into() { + // That's us, so return true. + return Ok(true); + } + + let message = HealthcheckMessage::generate_message(&ctx, false) + .map_err(|reason| HealthcheckRpcError::MessageGenerationFailed { reason })?; + + let encoded_message = message + .encode() + .map_err(|e| HealthcheckRpcError::MessageEncodingFailed { reason: e.to_string() })?; + + let (tx, rx): (Sender<()>, Receiver<()>) = oneshot::channel(); + + { + let mut book = ctx.healthcheck_response_handler.lock().await; + book.insert(target_peer_address, tx, address_record_exp); + } + + broadcast_p2p_msg( + &ctx, + peer_healthcheck_topic(&target_peer_address), + encoded_message, + None, + ); + + let timeout_duration = Duration::from_secs(healthcheck_message_exp_secs()); + Ok(rx.timeout(timeout_duration).await == Ok(Ok(()))) +} + +pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_libp2p::GossipsubMessage) { + macro_rules! try_or_return { + ($exp:expr, $msg: expr) => { + match $exp { + Ok(t) => t, + Err(e) => { + log::error!("{}, error: {e:?}", $msg); + return; + }, + } + }; + } + + let data = try_or_return!( + HealthcheckMessage::decode(&message.data), + "Couldn't decode healthcheck message" + ); + + let ctx = ctx.clone(); + + // Pass the remaining work to another thread to free up this one as soon as possible, + // so KDF can handle a high amount of healthcheck messages more efficiently. + ctx.spawner().spawn(async move { + let sender_peer = match data.is_received_message_valid() { + Ok(t) => t, + Err(e) => { + log::error!("Received an invalid healthcheck message. Error: {e}"); + return; + }, + }; + + if data.should_reply() { + // Reply the message so they know we are healthy. + + let msg = try_or_return!( + HealthcheckMessage::generate_or_use_cached_message(&ctx), + "Couldn't generate the healthcheck message, this is very unusual!" + ); + + let encoded_msg = try_or_return!( + msg.encode(), + "Couldn't encode healthcheck message, this is very unusual!" + ); + + let topic = peer_healthcheck_topic(&sender_peer); + broadcast_p2p_msg(&ctx, topic, encoded_msg, None); + } else { + // The requested peer is healthy; signal the response channel. + let mut response_handler = ctx.healthcheck_response_handler.lock().await; + if let Some(tx) = response_handler.remove(&sender_peer) { + if tx.send(()).is_err() { + log::error!("Result channel isn't present for peer '{sender_peer}'."); + }; + } else { + log::info!("Peer '{sender_peer}' isn't recorded in the healthcheck response handler."); + }; + } + }); +} + +#[cfg(any(test, target_arch = "wasm32"))] +mod tests { + use super::*; + use common::cross_test; + use crypto::CryptoCtx; + use mm2_libp2p::behaviours::atomicdex::generate_ed25519_keypair; + use mm2_test_helpers::for_tests::mm_ctx_with_iguana; + use std::mem::discriminant; + use std::str::FromStr; + + common::cfg_wasm32! { + use wasm_bindgen_test::*; + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + } + + fn ctx() -> MmArc { + let ctx = mm_ctx_with_iguana(Some("dummy-value")); + let p2p_key = { + let crypto_ctx = CryptoCtx::from_ctx(&ctx).unwrap(); + let key = bitcrypto::sha256(crypto_ctx.mm2_internal_privkey_slice()); + key.take() + }; + + let (cmd_tx, _) = futures::channel::mpsc::channel(0); + + let p2p_context = P2PContext::new(cmd_tx, generate_ed25519_keypair(p2p_key)); + p2p_context.store_to_mm_arc(&ctx); + + ctx + } + + cross_test!(test_peer_address, { + #[derive(Deserialize, Serialize)] + struct PeerAddressTest { + peer_address: PeerAddress, + } + + let address_str = "12D3KooWEtuv7kmgGCC7oAQ31hB7AR5KkhT3eEWB2bP2roo3M7rY"; + let json_content = format!("{{\"peer_address\": \"{address_str}\"}}"); + let address_struct: PeerAddressTest = serde_json::from_str(&json_content).unwrap(); + + let actual_peer_id = mm2_libp2p::PeerId::from_str(address_str).unwrap(); + let deserialized_peer_id: mm2_libp2p::PeerId = address_struct.peer_address.into(); + + assert_eq!(deserialized_peer_id, actual_peer_id); + }); + + cross_test!(test_valid_message, { + let ctx = ctx(); + let message = HealthcheckMessage::generate_message(&ctx, false).unwrap(); + message.is_received_message_valid().unwrap(); + }); + + cross_test!(test_corrupted_messages, { + let ctx = ctx(); + + let mut message = HealthcheckMessage::generate_message(&ctx, false).unwrap(); + message.data.expires_at_secs += healthcheck_message_exp_secs() * 3; + assert_eq!( + discriminant(&message.is_received_message_valid().err().unwrap()), + discriminant(&SignValidationError::LifetimeOverflow { + max_allowed_expiration_secs: 0, + remaining_expiration_secs: 0 + }) + ); + + let mut message = HealthcheckMessage::generate_message(&ctx, false).unwrap(); + message.data.is_a_reply = !message.data.is_a_reply; + assert_eq!( + discriminant(&message.is_received_message_valid().err().unwrap()), + discriminant(&SignValidationError::FakeSignature) + ); + + let mut message = HealthcheckMessage::generate_message(&ctx, false).unwrap(); + message.data.sender_public_key.push(0); + assert_eq!( + discriminant(&message.is_received_message_valid().err().unwrap()), + discriminant(&SignValidationError::InvalidPublicKey) + ); + }); + + cross_test!(test_expired_message, { + let ctx = ctx(); + let message = HealthcheckMessage::generate_message(&ctx, false).unwrap(); + common::executor::Timer::sleep(3.).await; + assert_eq!( + discriminant(&message.is_received_message_valid().err().unwrap()), + discriminant(&SignValidationError::Expired { + now_secs: 0, + expires_at_secs: 0 + }) + ); + }); + + cross_test!(test_encode_decode, { + let ctx = ctx(); + let original = HealthcheckMessage::generate_message(&ctx, false).unwrap(); + + let encoded = original.encode().unwrap(); + assert!(!encoded.is_empty()); + + let decoded = HealthcheckMessage::decode(&encoded).unwrap(); + assert_eq!(original, decoded); + }); +} diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index bd875511b0..cd055132a5 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -36,7 +36,7 @@ use mm2_metrics::mm_gauge; use mm2_net::network_event::NetworkEvent; use mm2_net::p2p::P2PContext; use rpc_task::RpcTaskError; -use serde_json::{self as json}; +use serde_json as json; use std::convert::TryInto; use std::io; use std::path::PathBuf; @@ -47,8 +47,9 @@ use std::{fs, usize}; #[cfg(not(target_arch = "wasm32"))] use crate::database::init_and_migrate_sql_db; use crate::heartbeat_event::HeartbeatEvent; +use crate::lp_healthcheck::peer_healthcheck_topic; use crate::lp_message_service::{init_message_service, InitMessageServiceError}; -use crate::lp_network::{lp_network_ports, p2p_event_process_loop, NetIdError}; +use crate::lp_network::{lp_network_ports, p2p_event_process_loop, subscribe_to_topic, NetIdError}; use crate::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, init_ordermatch_context, lp_ordermatch_loop, orders_kick_start, BalanceUpdateOrdermatchHandler, OrdermatchInitError}; use crate::lp_swap::{running_swaps_num, swap_kick_starts}; @@ -635,7 +636,8 @@ pub async fn init_p2p(ctx: MmArc) -> P2PResult<()> { ); }) .await; - let (cmd_tx, event_rx, _peer_id) = spawn_result?; + + let (cmd_tx, event_rx, peer_id) = spawn_result?; let p2p_context = P2PContext::new(cmd_tx, generate_ed25519_keypair(p2p_key)); p2p_context.store_to_mm_arc(&ctx); @@ -643,6 +645,9 @@ pub async fn init_p2p(ctx: MmArc) -> P2PResult<()> { let fut = p2p_event_process_loop(ctx.weak(), event_rx, i_am_seed); ctx.spawner().spawn(fut); + // Listen for health check messages. + subscribe_to_topic(&ctx, peer_healthcheck_topic(&peer_id.into())); + Ok(()) } diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index eb84f390d1..8e5195e93a 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -38,8 +38,7 @@ use mm2_net::p2p::P2PContext; use serde::de; use std::net::ToSocketAddrs; -use crate::lp_ordermatch; -use crate::{lp_stats, lp_swap}; +use crate::{lp_healthcheck, lp_ordermatch, lp_stats, lp_swap}; pub type P2PRequestResult = Result>; pub type P2PProcessResult = Result>; @@ -216,6 +215,9 @@ async fn process_p2p_message( } } }, + Some(lp_healthcheck::PEER_HEALTHCHECK_PREFIX) => { + lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await + }, None | Some(_) => (), } diff --git a/mm2src/mm2_main/src/mm2.rs b/mm2src/mm2_main/src/mm2.rs index 734e71becf..5224ed1b92 100644 --- a/mm2src/mm2_main/src/mm2.rs +++ b/mm2src/mm2_main/src/mm2.rs @@ -72,6 +72,7 @@ use mm2_err_handle::prelude::*; pub mod heartbeat_event; pub mod lp_dispatcher; +pub mod lp_healthcheck; pub mod lp_message_service; mod lp_native_dex; pub mod lp_network; diff --git a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs index fec08d106f..5a480a0a2e 100644 --- a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs +++ b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs @@ -1,4 +1,5 @@ use super::{DispatcherError, DispatcherResult, PUBLIC_METHODS}; +use crate::lp_healthcheck::peer_connection_healthcheck_rpc; use crate::lp_native_dex::init_hw::{cancel_init_trezor, init_trezor, init_trezor_status, init_trezor_user_action}; #[cfg(target_arch = "wasm32")] use crate::lp_native_dex::init_metamask::{cancel_connect_metamask, connect_metamask, connect_metamask_status}; @@ -214,6 +215,7 @@ async fn dispatcher_v2(request: MmRpcRequest, ctx: MmArc) -> DispatcherResult handle_mmrpc(ctx, request, withdraw).await, "ibc_chains" => handle_mmrpc(ctx, request, ibc_chains).await, "ibc_transfer_channels" => handle_mmrpc(ctx, request, ibc_transfer_channels).await, + "peer_connection_healthcheck" => handle_mmrpc(ctx, request, peer_connection_healthcheck_rpc).await, "withdraw_nft" => handle_mmrpc(ctx, request, withdraw_nft).await, "start_eth_fee_estimator" => handle_mmrpc(ctx, request, start_eth_fee_estimator).await, "stop_eth_fee_estimator" => handle_mmrpc(ctx, request, stop_eth_fee_estimator).await, diff --git a/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs b/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs index 57a4709758..906af55887 100644 --- a/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs +++ b/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs @@ -35,7 +35,7 @@ use uuid::Uuid; cfg_native! { use common::block_on; - use mm2_test_helpers::for_tests::{get_passphrase, new_mm2_temp_folder_path}; + use mm2_test_helpers::for_tests::{get_passphrase, new_mm2_temp_folder_path, peer_connection_healthcheck}; use mm2_io::fs::slurp; use hyper::header::ACCESS_CONTROL_ALLOW_ORIGIN; } @@ -5963,6 +5963,46 @@ fn test_sign_raw_transaction_p2wpkh() { assert!(response["error"].as_str().unwrap().contains("Signing error")); } +#[test] +#[cfg(not(target_arch = "wasm32"))] +fn test_connection_healthcheck_rpc() { + const BOB_ADDRESS: &str = "12D3KooWEtuv7kmgGCC7oAQ31hB7AR5KkhT3eEWB2bP2roo3M7rY"; + const BOB_SEED: &str = "dummy-value-bob"; + + const ALICE_ADDRESS: &str = "12D3KooWHnoKd2Lr7BoxHCCeBhcnfAZsdiCdojbEMLE7DDSbMo1g"; + const ALICE_SEED: &str = "dummy-value-alice"; + + let bob_conf = Mm2TestConf::seednode(BOB_SEED, &json!([])); + let bob_mm = MarketMakerIt::start(bob_conf.conf, bob_conf.rpc_password, None).unwrap(); + + thread::sleep(Duration::from_secs(2)); + + let mut alice_conf = Mm2TestConf::seednode(ALICE_SEED, &json!([])); + alice_conf.conf["seednodes"] = json!([bob_mm.my_seed_addr()]); + alice_conf.conf["skip_startup_checks"] = json!(true); + let alice_mm = MarketMakerIt::start(alice_conf.conf, alice_conf.rpc_password, None).unwrap(); + + thread::sleep(Duration::from_secs(2)); + + // Self-address check for Bob + let response = block_on(peer_connection_healthcheck(&bob_mm, BOB_ADDRESS)); + assert_eq!(response["result"], json!(true)); + + // Check address of Alice + let response = block_on(peer_connection_healthcheck(&bob_mm, ALICE_ADDRESS)); + assert_eq!(response["result"], json!(true)); + + thread::sleep(Duration::from_secs(1)); + + // Self-address check for Alice + let response = block_on(peer_connection_healthcheck(&alice_mm, ALICE_ADDRESS)); + assert_eq!(response["result"], json!(true)); + + // Check address of Bob + let response = block_on(peer_connection_healthcheck(&alice_mm, BOB_ADDRESS)); + assert_eq!(response["result"], json!(true)); +} + #[cfg(all(feature = "run-device-tests", not(target_arch = "wasm32")))] mod trezor_tests { use coins::eth::{eth_coin_from_conf_and_request, gas_limit, EthCoin}; diff --git a/mm2src/mm2_p2p/src/lib.rs b/mm2src/mm2_p2p/src/lib.rs index 8e6e6db159..27e26e8f06 100644 --- a/mm2src/mm2_p2p/src/lib.rs +++ b/mm2src/mm2_p2p/src/lib.rs @@ -6,12 +6,14 @@ mod network; mod relay_address; mod swarm_runtime; +use derive_more::Display; use lazy_static::lazy_static; use secp256k1::{Message as SecpMessage, PublicKey as Secp256k1Pubkey, Secp256k1, SecretKey, SignOnly, Signature, VerifyOnly}; use serde::{de, Deserialize, Serialize, Serializer}; use sha2::digest::Update; use sha2::{Digest, Sha256}; +use std::str::FromStr; pub use crate::swarm_runtime::SwarmRuntime; @@ -43,6 +45,69 @@ lazy_static! { static ref SECP_SIGN: Secp256k1 = Secp256k1::signing_only(); } +/// Wrapper of `libp2p::PeerId` with trait additional implementations. +/// +/// TODO: This should be used as a replacement of `libp2p::PeerId` in the entire project. +#[derive(Clone, Copy, Debug, Display, Eq, Hash, PartialEq)] +pub struct PeerAddress(PeerId); + +impl From for PeerAddress { + fn from(value: PeerId) -> Self { Self(value) } +} + +impl From for PeerId { + fn from(value: PeerAddress) -> Self { value.0 } +} + +impl Serialize for PeerAddress { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.0.to_string()) + } +} + +impl<'de> Deserialize<'de> for PeerAddress { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct PeerAddressVisitor; + + impl<'de> serde::de::Visitor<'de> for PeerAddressVisitor { + type Value = PeerAddress; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a string representation of peer id.") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + if value.len() > 100 { + return Err(serde::de::Error::invalid_length( + value.len(), + &"peer id cannot exceed 100 characters.", + )); + } + + Ok(PeerId::from_str(value).map_err(de::Error::custom)?.into()) + } + + fn visit_string(self, value: String) -> Result + where + E: de::Error, + { + self.visit_str(&value) + } + } + + deserializer.deserialize_str(PeerAddressVisitor) + } +} + #[derive(Clone, Copy, Debug)] pub enum NetworkInfo { /// The in-memory network. diff --git a/mm2src/mm2_test_helpers/src/for_tests.rs b/mm2src/mm2_test_helpers/src/for_tests.rs index df5a488cf9..196112cc0b 100644 --- a/mm2src/mm2_test_helpers/src/for_tests.rs +++ b/mm2src/mm2_test_helpers/src/for_tests.rs @@ -868,9 +868,7 @@ pub fn nft_dev_conf() -> Json { }) } -fn set_chain_id(conf: &mut Json, chain_id: u64) { - conf["chain_id"] = json!(chain_id); -} +fn set_chain_id(conf: &mut Json, chain_id: u64) { conf["chain_id"] = json!(chain_id); } pub fn eth_sepolia_conf() -> Json { json!({ @@ -1896,6 +1894,30 @@ pub async fn enable_qrc20( json::from_str(&electrum.1).unwrap() } +pub async fn peer_connection_healthcheck(mm: &MarketMakerIt, peer_address: &str) -> Json { + let response = mm + .rpc(&json!({ + "userpass": mm.userpass, + "method": "peer_connection_healthcheck", + "mmrpc": "2.0", + "params": { + "peer_address": peer_address + } + })) + .await + .unwrap(); + + assert_eq!( + response.0, + StatusCode::OK, + "RPC «peer_connection_healthcheck» failed with {} {}", + response.0, + response.1 + ); + + json::from_str(&response.1).unwrap() +} + /// Reads passphrase and userpass from .env file pub fn from_env_file(env: Vec) -> (Option, Option) { use regex::bytes::Regex; @@ -2973,7 +2995,10 @@ pub async fn enable_tendermint( tx_history: bool, ) -> Json { let ibc_requests: Vec<_> = ibc_assets.iter().map(|ticker| json!({ "ticker": ticker })).collect(); - let nodes: Vec = rpc_urls.iter().map(|u| json!({"url": u, "komodo_proxy": false })).collect(); + let nodes: Vec = rpc_urls + .iter() + .map(|u| json!({"url": u, "komodo_proxy": false })) + .collect(); let request = json!({ "userpass": mm.userpass, @@ -3010,7 +3035,10 @@ pub async fn enable_tendermint_without_balance( tx_history: bool, ) -> Json { let ibc_requests: Vec<_> = ibc_assets.iter().map(|ticker| json!({ "ticker": ticker })).collect(); - let nodes: Vec = rpc_urls.iter().map(|u| json!({"url": u, "komodo_proxy": false })).collect(); + let nodes: Vec = rpc_urls + .iter() + .map(|u| json!({"url": u, "komodo_proxy": false })) + .collect(); let request = json!({ "userpass": mm.userpass, diff --git a/mm2src/proxy_signature/src/lib.rs b/mm2src/proxy_signature/src/lib.rs index f4ae2393a0..15cf596c89 100644 --- a/mm2src/proxy_signature/src/lib.rs +++ b/mm2src/proxy_signature/src/lib.rs @@ -2,6 +2,7 @@ use chrono::Utc; use http::Uri; use libp2p::identity::{Keypair, PublicKey, SigningError}; use serde::{Deserialize, Serialize}; +use std::convert::TryFrom; /// Represents a message and its corresponding signature. #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] @@ -69,8 +70,11 @@ impl RawMessage { impl ProxySign { /// Validates if the message is still valid based on its expiration time and signature verification. - pub fn is_valid_message(&self) -> bool { - if Utc::now().timestamp() > self.raw_message.expires_at { + pub fn is_valid_message(&self, max_message_exp_secs: u64) -> bool { + let now = Utc::now().timestamp(); + let remaining_expiration_seconds = u64::try_from(self.raw_message.expires_at - now).unwrap_or(0); + + if remaining_expiration_seconds == 0 || remaining_expiration_seconds > max_message_exp_secs { return false; } @@ -111,14 +115,14 @@ pub mod proxy_signature_tests { fn sign_and_verify() { let keypair = random_keypair(); let signed_proxy_message = RawMessage::sign(&keypair, &Uri::from_static("http://example.com"), 0, 5).unwrap(); - assert!(signed_proxy_message.is_valid_message()); + assert!(signed_proxy_message.is_valid_message(10)); } #[test] fn expired_signature() { let keypair = random_keypair(); let signed_proxy_message = RawMessage::sign(&keypair, &Uri::from_static("http://example.com"), 0, -1).unwrap(); - assert!(!signed_proxy_message.is_valid_message()); + assert!(!signed_proxy_message.is_valid_message(10)); } #[test] @@ -127,17 +131,24 @@ pub mod proxy_signature_tests { let mut signed_proxy_message = RawMessage::sign(&keypair, &Uri::from_static("http://example.com"), 0, 5).unwrap(); signed_proxy_message.raw_message.uri = "http://demo.com".to_string(); - assert!(!signed_proxy_message.is_valid_message()); + assert!(!signed_proxy_message.is_valid_message(10)); let mut signed_proxy_message = RawMessage::sign(&keypair, &Uri::from_static("http://example.com"), 0, 5).unwrap(); signed_proxy_message.raw_message.body_size += 1; - assert!(!signed_proxy_message.is_valid_message()); + assert!(!signed_proxy_message.is_valid_message(10)); let mut signed_proxy_message = RawMessage::sign(&keypair, &Uri::from_static("http://example.com"), 0, 5).unwrap(); signed_proxy_message.raw_message.expires_at += 1; - assert!(!signed_proxy_message.is_valid_message()); + assert!(!signed_proxy_message.is_valid_message(10)); + } + + #[test] + fn message_lifetime_overflow() { + let keypair = random_keypair(); + let signed_proxy_message = RawMessage::sign(&keypair, &Uri::from_static("http://example.com"), 0, 5).unwrap(); + assert!(!signed_proxy_message.is_valid_message(4)); } #[test]