Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into fix-p2p-cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
shamardy committed Oct 3, 2024
2 parents ff54b83 + 1f3dffa commit 3f0a6bd
Show file tree
Hide file tree
Showing 14 changed files with 650 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 19 additions & 3 deletions mm2src/common/expirable_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,26 @@ pub struct ExpirableEntry<V> {
}

impl<V> ExpirableEntry<V> {
#[inline(always)]
pub fn new(v: V, exp: Duration) -> Self {
Self {
expires_at: Instant::now() + exp,
value: v,
}
}

#[inline(always)]
pub fn get_element(&self) -> &V { &self.value }

#[inline(always)]
pub fn update_value(&mut self, v: V) { self.value = v }

#[inline(always)]
pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at }

/// Checks whether entry has longer ttl than the given one.
#[inline(always)]
pub fn has_longer_life_than(&self, min_ttl: Duration) -> bool { self.expires_at > Instant::now() + min_ttl }
}

impl<K: Eq + Hash + Copy, V> Default for ExpirableMap<K, V> {
Expand Down Expand Up @@ -71,9 +88,8 @@ impl<K: Eq + Hash + Copy, V> ExpirableMap<K, V> {
/// the old one will be returned.
pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option<V> {
self.clear_expired_entries();
let expires_at = Instant::now() + exp;
let entry = ExpirableEntry { expires_at, value: v };
self.expiries.insert(expires_at, k);
let entry = ExpirableEntry::new(v, exp);
self.expiries.insert(entry.expires_at, k);
self.map.insert(k, entry).map(|v| v.value)
}

Expand Down
1 change: 1 addition & 0 deletions mm2src/mm2_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ lazy_static = "1.4"
mm2_err_handle = { path = "../mm2_err_handle" }
mm2_event_stream = { path = "../mm2_event_stream" }
mm2_metrics = { path = "../mm2_metrics" }
mm2-libp2p = { path = "../mm2_p2p", package = "mm2_p2p" }
primitives = { path = "../mm2_bitcoin/primitives" }
rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] }
serde = "1"
Expand Down
15 changes: 10 additions & 5 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use crate::data_asker::DataAsker;
#[cfg(feature = "track-ctx-pointer")]
use common::executor::Timer;
use common::executor::{abortable_queue::{AbortableQueue, WeakSpawner},
graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture};
use common::log::{self, LogLevel, LogOnError, LogState};
use common::{cfg_native, cfg_wasm32, small_rng};
use common::{executor::{abortable_queue::{AbortableQueue, WeakSpawner},
graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture},
expirable_map::ExpirableMap};
use futures::channel::oneshot;
use futures::lock::Mutex as AsyncMutex;
use gstuff::{try_s, Constructible, ERR, ERRL};
use lazy_static::lazy_static;
use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration};
use mm2_libp2p::PeerAddress;
use mm2_metrics::{MetricsArc, MetricsOps};
use primitives::hash::H160;
use rand::Rng;
Expand All @@ -20,8 +25,6 @@ use std::future::Future;
use std::ops::Deref;
use std::sync::{Arc, Mutex};

use crate::data_asker::DataAsker;

cfg_wasm32! {
use mm2_rpc::wasm_rpc::WasmRpcSender;
use crate::DbNamespaceId;
Expand All @@ -30,7 +33,6 @@ cfg_wasm32! {
cfg_native! {
use db_common::async_sql_conn::AsyncConnection;
use db_common::sqlite::rusqlite::Connection;
use futures::lock::Mutex as AsyncMutex;
use rustls::ServerName;
use mm2_metrics::prometheus;
use mm2_metrics::MmMetricsError;
Expand Down Expand Up @@ -142,6 +144,8 @@ pub struct MmCtx {
/// asynchronous handle for rusqlite connection.
#[cfg(not(target_arch = "wasm32"))]
pub async_sqlite_connection: Constructible<Arc<AsyncMutex<AsyncConnection>>>,
/// Links the RPC context to the P2P context to handle health check responses.
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<PeerAddress, oneshot::Sender<()>>>,
}

impl MmCtx {
Expand Down Expand Up @@ -191,6 +195,7 @@ impl MmCtx {
nft_ctx: Mutex::new(None),
#[cfg(not(target_arch = "wasm32"))]
async_sqlite_connection: Constructible::default(),
healthcheck_response_handler: AsyncMutex::new(ExpirableMap::default()),
}
}

Expand Down
1 change: 1 addition & 0 deletions mm2src/mm2_main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ bitcrypto = { path = "../mm2_bitcoin/crypto" }
blake2 = "0.10.6"
bytes = "0.4"
chain = { path = "../mm2_bitcoin/chain" }
chrono = "0.4"
cfg-if = "1.0"
coins = { path = "../coins" }
coins_activation = { path = "../coins_activation" }
Expand Down
Loading

0 comments on commit 3f0a6bd

Please sign in to comment.