From b972819ae865fe36fa6df65d24295b38994cda96 Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 18 Sep 2023 17:32:18 -0400 Subject: [PATCH 1/5] persist kademlia --- .../src/traits/networking/libp2p_network.rs | 43 +-- .../src/network/behaviours/dht/cache.rs | 308 ++++++++++++++++++ .../network/behaviours/{dht.rs => dht/mod.rs} | 43 ++- crates/libp2p-networking/src/network/mod.rs | 12 +- .../src/network/node/handle.rs | 169 +--------- crates/libp2p-networking/tests/common/mod.rs | 10 +- crates/libp2p-networking/tests/counter.rs | 31 +- 7 files changed, 384 insertions(+), 232 deletions(-) create mode 100644 crates/libp2p-networking/src/network/behaviours/dht/cache.rs rename crates/libp2p-networking/src/network/behaviours/{dht.rs => dht/mod.rs} (95%) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index da8784e0a7..bcb933c7bf 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -5,14 +5,14 @@ use super::NetworkingMetrics; use crate::NodeImplementation; use async_compatibility_layer::{ - art::{async_block_on, async_sleep, async_spawn, async_timeout}, + art::{async_block_on, async_sleep, async_spawn}, channel::{unbounded, UnboundedReceiver, UnboundedSendError, UnboundedSender}, }; use async_lock::RwLock; use async_trait::async_trait; use bimap::BiHashMap; use bincode::Options; -use hotshot_constants::{KAD_DEFAULT_REPUB_INTERVAL_SEC, LOOK_AHEAD}; +use hotshot_constants::LOOK_AHEAD; use hotshot_task::{boxed_sync, BoxSyncFuture}; use hotshot_types::{ data::ViewNumber, @@ -82,7 +82,7 @@ struct Libp2pNetworkInner { /// this node's public key pk: K, /// handle to control the network - handle: Arc>, + handle: Arc>, /// map of known replica peer ids to public keys broadcast_recv: UnboundedReceiver, /// Sender for broadcast messages @@ -93,8 +93,6 @@ struct Libp2pNetworkInner { direct_recv: UnboundedReceiver, /// Sender for node lookup (relevant view number, key of node) (None for shutdown) node_lookup_send: UnboundedSender>, - /// Sender for shutdown of the peer cache's garbage collection task - cache_gc_shutdown_send: UnboundedSender<()>, /// this is really cheating to enable local tests /// hashset of (bootstrap_addr, peer_id) bootstrap_addrs: PeerInfoVec, @@ -287,7 +285,7 @@ impl Libp2pNetwork { ) -> Result, NetworkError> { assert!(bootstrap_addrs_len > 4, "Need at least 5 bootstrap nodes"); let network_handle = Arc::new( - NetworkNodeHandle::<(), K>::new(config, id) + NetworkNodeHandle::<()>::new(config, id) .await .map_err(Into::::into)?, ); @@ -318,7 +316,6 @@ impl Libp2pNetwork { let (direct_send, direct_recv) = unbounded(); let (broadcast_send, broadcast_recv) = unbounded(); let (node_lookup_send, node_lookup_recv) = unbounded(); - let (cache_gc_shutdown_send, cache_gc_shutdown_recv) = unbounded::<()>(); let result = Libp2pNetwork { inner: Arc::new(Libp2pNetworkInner { @@ -336,7 +333,6 @@ impl Libp2pNetwork { metrics: NetworkingMetrics::new(&*metrics), topic_map, node_lookup_send, - cache_gc_shutdown_send, // Start the latest view from 0. "Latest" refers to "most recent view we are polling for // proposals on". We need this because to have consensus info injected we need a working // network already. In the worst case, we send a few lookups we don't need. @@ -345,20 +341,15 @@ impl Libp2pNetwork { }; result.spawn_event_generator(direct_send, broadcast_send); - result.spawn_node_lookup(node_lookup_recv, cache_gc_shutdown_recv); + result.spawn_node_lookup(node_lookup_recv); result.spawn_connect(id); Ok(result) } /// Spawns task for looking up nodes pre-emptively - /// as well as garbage collecting the peer cache #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)] - fn spawn_node_lookup( - &self, - node_lookup_recv: UnboundedReceiver>, - cache_gc_shutdown_send: UnboundedReceiver<()>, - ) { + fn spawn_node_lookup(&self, node_lookup_recv: UnboundedReceiver>) { let handle = self.inner.handle.clone(); let dht_timeout = self.inner.dht_timeout; let latest_seen_view = self.inner.latest_seen_view.clone(); @@ -375,32 +366,13 @@ impl Libp2pNetwork { // only run if we are not too close to the next view number if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number { - // look up node, caching if applicable + // look up if let Err(err) = handle_.lookup_node::(pk.clone(), dht_timeout).await { error!("Failed to perform lookup for key {:?}: {}", pk, err); }; } } }); - - // deals with garbage collecting the lookup queue - let handle_ = handle.clone(); - async_spawn(async move { - loop { - let ttl = handle_ - .config() - .ttl - .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC * 8)); - if async_timeout(ttl, cache_gc_shutdown_send.recv()) - .await - .is_err() - { - handle_.prune_peer_cache().await; - } else { - break; - } - } - }); } /// Initiates connection to the outside world @@ -573,7 +545,6 @@ impl ConnectedNetwork for Libp2p { let closure = async move { self.inner.node_lookup_send.send(None).await.unwrap(); - self.inner.cache_gc_shutdown_send.send(()).await.unwrap(); if self.inner.handle.is_killed() { error!("Called shut down when already shut down! Noop."); } else { diff --git a/crates/libp2p-networking/src/network/behaviours/dht/cache.rs b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs new file mode 100644 index 0000000000..d87349f3d5 --- /dev/null +++ b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs @@ -0,0 +1,308 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + time::{Duration, SystemTime}, +}; + +use async_compatibility_layer::art::async_block_on; +use async_lock::RwLock; +use bincode::Options; +use dashmap::{mapref::one::Ref, DashMap}; +use hotshot_constants::KAD_DEFAULT_REPUB_INTERVAL_SEC; +use hotshot_utils::bincode::bincode_opts; +use snafu::{ResultExt, Snafu}; + +/// Error wrapper type for cache +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum CacheError { + /// Failed to read or write from disk + Disk { + /// source of error + source: std::io::Error, + }, + + /// Failure to serialize the cache + Serialization { + /// source of error + source: Box, + }, + + /// Failure to deserialize the cache + Deserialization { + /// source of error + source: Box, + }, +} + +pub struct Config { + pub filename: String, + pub expiry: Duration, + pub max_disk_parity_delta: u32, +} + +impl Default for Config { + fn default() -> Self { + Self { + filename: "dht.cache".to_string(), + expiry: Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC * 16), + max_disk_parity_delta: 4, + } + } +} + +impl Default for Cache { + fn default() -> Self { + Self::new(Config::default()) + } +} + +pub struct Cache { + /// the cache's config + config: Config, + + /// the cache for records (key -> value) + cache: Arc, Vec>>, + /// the expiries for the dht cache, in order (expiry time -> key) + expiries: Arc>>>, + + /// number of inserts since the last save + disk_parity_delta: Arc, +} + +impl Cache { + pub fn new(config: Config) -> Self { + let cache = Self { + cache: Arc::new(DashMap::new()), + expiries: Arc::new(RwLock::new(BTreeMap::new())), + config, + disk_parity_delta: Arc::new(AtomicU32::new(0)), + }; + + // try loading from file + if let Err(err) = async_block_on(cache.load()) { + tracing::warn!("failed to load cache from file: {}", err); + }; + + cache + } + + pub async fn load(&self) -> Result<(), CacheError> { + let encoded = std::fs::read(self.config.filename.clone()).context(DiskSnafu)?; + + let cache: HashMap, Vec)> = bincode_opts() + .deserialize(&encoded) + .context(DeserializationSnafu)?; + + // inline prune and insert + let now = SystemTime::now(); + for (expiry, (key, value)) in cache { + if now < expiry { + self.cache.insert(key.clone(), value); + self.expiries.write().await.insert(expiry, key); + } + } + + Ok(()) + } + + pub async fn save(&self) -> Result<(), CacheError> { + self.prune().await; + + let mut cache_to_write = HashMap::new(); + let expiries = self.expiries.read().await; + for (expiry, key) in &*expiries { + let entry = self.cache.get(key).unwrap(); + cache_to_write.insert(expiry, (key, entry.value().clone())); + } + + let encoded = bincode_opts() + .serialize(&cache_to_write) + .context(SerializationSnafu)?; + + std::fs::write(self.config.filename.clone(), encoded).context(DiskSnafu)?; + + Ok(()) + } + + async fn prune(&self) { + let now = SystemTime::now(); + let mut expiries = self.expiries.write().await; + let mut removed: u32 = 0; + + while let Some((expires, key)) = expiries.pop_first() { + if now > expires { + self.cache.remove(&key); + removed += 1; + } else { + expiries.insert(expires, key); + break; + } + } + + // todo: test speed on this as opposed to inline + if removed > 0 { + self.disk_parity_delta.fetch_add(removed, Ordering::Relaxed); + } + } + + pub async fn get(&self, key: &Vec) -> Option, Vec>> { + // prune, save if necessary + self.prune().await; + self.save_if_necessary().await; + + // get + self.cache.get(key) + } + + pub async fn insert(&self, key: Vec, value: Vec) { + // insert into cache and expiries + self.cache.insert(key.clone(), value); + self.expiries + .write() + .await + .insert(SystemTime::now() + self.config.expiry, key); + + // save if reached max disk parity delta + self.disk_parity_delta.fetch_add(1, Ordering::Relaxed); + self.save_if_necessary().await; + } + + async fn save_if_necessary(&self) { + let cur_disk_parity_delta = self.disk_parity_delta.load(Ordering::Relaxed); + if cur_disk_parity_delta >= self.config.max_disk_parity_delta { + if let Err(err) = self.save().await { + tracing::warn!("failed to save cache to file: {}", err); + }; + } + } +} + +#[cfg(test)] +mod test { + + use super::*; + use async_compatibility_layer::art::async_sleep; + use libp2p_identity::PeerId; + use tracing::instrument; + + /// cache eviction test + #[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) + )] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + #[instrument] + async fn test_dht_cache_eviction() { + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // cache with 1s eviction + let cache = Cache::new(Config { + filename: "test.cache".to_string(), + expiry: Duration::from_secs(1), + max_disk_parity_delta: 4, + }); + + let (key, value) = (PeerId::random(), PeerId::random()); + + // insert + cache.insert(key.to_bytes(), value.to_bytes()).await; + + // check that it is in the cache and expiries + assert_eq!( + cache.get(&key.to_bytes()).await.unwrap().value(), + &value.to_bytes() + ); + assert_eq!(cache.expiries.read().await.len(), 1); + + // sleep for 1s + async_sleep(Duration::from_secs(1)).await; + + // check that now is evicted + assert!(cache.get(&key.to_bytes()).await.is_none()); + + // check that the cache and expiries are empty + assert!(cache.expiries.read().await.is_empty()); + assert!(cache.cache.is_empty()); + } + + /// cache add test + #[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) + )] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + #[instrument] + async fn test_dht_cache_save_load() { + let _ = std::fs::remove_file("test.cache"); + + let cache = Cache::new(Config { + filename: "test.cache".to_string(), + expiry: Duration::from_secs(600), + max_disk_parity_delta: 4, + }); + + // add 10 key-value pairs to the cache + for i in 0 as u8..10 as u8 { + let (key, value) = (vec![i; 1], vec![i + 1; 1]); + cache.insert(key, value).await; + } + + // save the cache + cache.save().await.unwrap(); + + // load the cache + let cache = Cache::new(Config { + filename: "test.cache".to_string(), + expiry: Duration::from_secs(600), + max_disk_parity_delta: 4, + }); + + // check that the cache has the 10 key-value pairs + for i in 0 as u8..10 as u8 { + let (key, value) = (vec![i; 1], vec![i + 1; 1]); + assert_eq!(cache.get(&key).await.unwrap().value(), &value); + } + + // delete the cache file + let _ = std::fs::remove_file("test.cache").unwrap(); + } + + #[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) + )] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + #[instrument] + async fn test_dht_disk_parity() { + let _ = std::fs::remove_file("test.cache"); + + let cache = Cache::new(Config { + // tests run sequentially, shouldn't matter + filename: "test.cache".to_string(), + expiry: Duration::from_secs(600), + max_disk_parity_delta: 4, + }); + + // insert into cache + for i in 0..3 { + cache.insert(vec![i; 1], vec![i + 1; 1]).await; + } + + // check that file is not saved + assert!(!std::path::Path::new("test.cache").exists()); + + // insert into cache + cache.insert(vec![0; 1], vec![1; 1]).await; + + // check that file is saved + assert!(std::path::Path::new("test.cache").exists()); + + // delete the cache file + let _ = std::fs::remove_file("test.cache").unwrap(); + } +} diff --git a/crates/libp2p-networking/src/network/behaviours/dht.rs b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs similarity index 95% rename from crates/libp2p-networking/src/network/behaviours/dht.rs rename to crates/libp2p-networking/src/network/behaviours/dht/mod.rs index 46adb05d0c..68f15c9aae 100644 --- a/crates/libp2p-networking/src/network/behaviours/dht.rs +++ b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs @@ -5,6 +5,9 @@ use std::{ time::Duration, }; +mod cache; + +use async_compatibility_layer::art::async_block_on; use futures::channel::oneshot::Sender; use libp2p::{ kad::{ @@ -21,6 +24,8 @@ use tracing::{error, info, warn}; pub(crate) const NUM_REPLICATED_TO_TRUST: usize = 2; const MAX_DHT_QUERY_SIZE: usize = 5; +use self::cache::Cache; + use super::exponential_backoff::ExponentialBackoff; /// Behaviour wrapping libp2p's kademlia @@ -56,6 +61,8 @@ pub struct DHTBehaviour { pub peer_id: PeerId, /// replication factor pub replication_factor: NonZeroUsize, + /// kademlia cache + cache: Cache, } /// State of bootstrapping @@ -138,6 +145,7 @@ impl DHTBehaviour { }, in_progress_get_closest_peers: HashMap::default(), replication_factor, + cache: Cache::default(), } } @@ -223,17 +231,26 @@ impl DHTBehaviour { return; } - let qid = self.kadem.get_record(key.clone().into()); - let query = KadGetQuery { - backoff, - progress: DHTProgress::InProgress(qid), - notify: chan, - num_replicas: factor, - key, - retry_count: retry_count - 1, - records: HashMap::default(), - }; - self.in_progress_get_record_queries.insert(qid, query); + // check cache before making the request + if let Some(entry) = async_block_on(self.cache.get(&key)) { + // exists in cache + if chan.send(entry.value().clone()).is_err() { + warn!("Get DHT: channel closed before get record request result could be sent"); + } + } else { + // doesn't exist in cache, actually propagate request + let qid = self.kadem.get_record(key.clone().into()); + let query = KadGetQuery { + backoff, + progress: DHTProgress::InProgress(qid), + notify: chan, + num_replicas: factor, + key, + retry_count: retry_count - 1, + records: HashMap::default(), + }; + self.in_progress_get_record_queries.insert(qid, query); + } } /// update state based on recv-ed get query @@ -279,6 +296,10 @@ impl DHTBehaviour { .into_iter() .find(|(_, v)| *v >= NUM_REPLICATED_TO_TRUST) { + // insert into cache + async_block_on(self.cache.insert(key, r.clone())); + + // return value if notify.send(r).is_err() { warn!("Get DHT: channel closed before get record request result could be sent"); } diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index 54b3d89035..3ec1c07b73 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -34,7 +34,7 @@ use libp2p::{ use libp2p_identity::PeerId; use rand::seq::IteratorRandom; use serde::{Deserialize, Serialize}; -use std::{collections::HashSet, fmt::Debug, hash::Hash, str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashSet, fmt::Debug, str::FromStr, sync::Arc, time::Duration}; use tracing::{info, instrument}; #[cfg(async_executor_impl = "async-std")] @@ -229,12 +229,12 @@ pub async fn gen_transport( /// a single node, connects them to each other /// and waits for connections to propagate to all nodes. #[instrument] -pub async fn spin_up_swarm( +pub async fn spin_up_swarm( timeout_len: Duration, known_nodes: Vec<(Option, Multiaddr)>, config: NetworkNodeConfig, idx: usize, - handle: &Arc>, + handle: &Arc>, ) -> Result<(), NetworkNodeHandleError> { info!("known_nodes{:?}", known_nodes); handle.add_known_peers(known_nodes).await?; @@ -248,9 +248,9 @@ pub async fn spin_up_swarm( - handles: &[Arc>], +pub fn get_random_handle( + handles: &[Arc>], rng: &mut dyn rand::RngCore, -) -> Arc> { +) -> Arc> { handles.iter().choose(rng).unwrap().clone() } diff --git a/crates/libp2p-networking/src/network/node/handle.rs b/crates/libp2p-networking/src/network/node/handle.rs index 04961140c6..c4e6460666 100644 --- a/crates/libp2p-networking/src/network/node/handle.rs +++ b/crates/libp2p-networking/src/network/node/handle.rs @@ -11,25 +11,22 @@ use async_compatibility_layer::{ UnboundedReceiver, UnboundedRecvError, UnboundedSender, }, }; -use async_lock::{Mutex, RwLock}; +use async_lock::Mutex; use bincode::Options; -use dashmap::DashMap; use futures::{stream::FuturesOrdered, Future, FutureExt}; -use hotshot_constants::KAD_DEFAULT_REPUB_INTERVAL_SEC; use hotshot_utils::bincode::bincode_opts; use libp2p::{request_response::ResponseChannel, Multiaddr}; use libp2p_identity::PeerId; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use std::{ - collections::{BTreeMap, HashSet}, + collections::HashSet, fmt::Debug, - hash::Hash, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::{Duration, Instant, SystemTime}, + time::{Duration, Instant}, }; use tracing::{debug, info, instrument}; @@ -37,7 +34,7 @@ use tracing::{debug, info, instrument}; /// - A reference to the state /// - Controls for the swarm #[derive(Debug)] -pub struct NetworkNodeHandle { +pub struct NetworkNodeHandle { /// network configuration network_config: NetworkNodeConfig, /// the state of the replica @@ -51,10 +48,6 @@ pub struct NetworkNodeHandle { peer_id: PeerId, /// human readable id id: usize, - /// the cache for peers we've looked up - peer_cache: Arc>, - /// the expiries for the peer cache, in order - peer_cache_expiries: Arc>>, /// A list of webui listeners that are listening for changes on this node webui_listeners: Arc>>>, @@ -91,7 +84,7 @@ impl NetworkNodeReceiver { } } -impl NetworkNodeHandle { +impl NetworkNodeHandle { /// constructs a new node listening on `known_addr` #[instrument] pub async fn new(config: NetworkNodeConfig, id: usize) -> Result { @@ -126,8 +119,6 @@ impl NetworkNodeHa listen_addr, peer_id, id, - peer_cache: Arc::new(DashMap::new()), - peer_cache_expiries: Arc::new(RwLock::new(BTreeMap::new())), webui_listeners: Arc::default(), receiver: NetworkNodeReceiver { kill_switch, @@ -147,10 +138,9 @@ impl NetworkNodeHa #[allow(clippy::unused_async)] pub async fn spawn_handler(self: &Arc, cb: F) -> impl Future where - F: Fn(NetworkEvent, Arc>) -> RET + Sync + Send + 'static, + F: Fn(NetworkEvent, Arc>) -> RET + Sync + Send + 'static, RET: Future> + Send + 'static, S: Send + 'static, - K: Send + Sync + 'static, { assert!( !self.receiver.receiver_spawned.swap(true, Ordering::Relaxed), @@ -269,7 +259,7 @@ impl NetworkNodeHa } } -impl NetworkNodeHandle { +impl NetworkNodeHandle { /// Print out the routing table used by kademlia /// NOTE: only for debugging purposes currently /// # Errors @@ -292,48 +282,16 @@ impl NetworkNodeHandle { r.await.map_err(|_| NetworkNodeHandleError::RecvError) } - /// Prunes the peer lookup cache, removing old entries - /// Should be 1:1 with kademlia expiries - pub async fn prune_peer_cache(&self) { - let now = SystemTime::now(); - let mut expiries = self.peer_cache_expiries.write().await; - - while let Some((expires, key)) = expiries.pop_first() { - if now > expires { - self.peer_cache.remove(&key); - } else { - expiries.insert(expires, key); - break; - } - } - } - /// Looks up a node's `PeerId` and attempts to validate routing - /// Will use cached `PeerId` if available /// # Errors /// if the peer was unable to be looked up (did not provide a response, DNE) - pub async fn lookup_node Deserialize<'a>>( + pub async fn lookup_node Deserialize<'a> + Serialize>( &self, - key: K, + key: V, dht_timeout: Duration, ) -> Result { - let pid = if let Some(record) = self.peer_cache.get(&key) { - // exists in cache. look up routing but skip kademlia - *record.value() - } else { - // does not exist in cache. look up in kademlia, store in cache - let pid = self.get_record_timeout::(&key, dht_timeout).await?; - self.peer_cache.insert(key.clone(), pid); - self.peer_cache_expiries.write().await.insert( - SystemTime::now() - + self - .network_config - .ttl - .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC * 16)), - key, - ); - pid - }; + // get record (from DHT) + let pid = self.get_record_timeout::(&key, dht_timeout).await?; // pid lookup for routing self.lookup_pid(pid).await?; @@ -669,7 +627,7 @@ impl NetworkNodeHandle { } } -impl NetworkNodeHandle { +impl NetworkNodeHandle { /// Get a clone of the internal state pub async fn state(&self) -> S { self.state.cloned().await @@ -738,106 +696,3 @@ pub mod network_node_handle_error { NetworkSnafu, NodeConfigSnafu, RecvSnafu, SendSnafu, SerializationSnafu, TimeoutSnafu, }; } - -#[cfg(test)] -mod test { - use super::*; - - /// libp2p peer cache test - #[cfg_attr( - async_executor_impl = "tokio", - tokio::test(flavor = "multi_thread", worker_threads = 2) - )] - #[cfg_attr(async_executor_impl = "async-std", async_std::test)] - #[instrument] - async fn test_libp2p_cache_eviction() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle: NetworkNodeHandle<(), PeerId> = - NetworkNodeHandle::new(NetworkNodeConfig::default(), 0) - .await - .unwrap(); - - let now = SystemTime::now(); - let later = now + Duration::from_secs(1); - - // present insert - let present_key = PeerId::random(); - let present_pid = PeerId::random(); - handle.peer_cache.insert(present_key, present_pid); - handle - .peer_cache_expiries - .write() - .await - .insert(now, present_key); - - // later insert - let later_key = PeerId::random(); - let later_pid = PeerId::random(); - handle.peer_cache.insert(later_key, later_pid); - handle - .peer_cache_expiries - .write() - .await - .insert(now + Duration::from_secs(1), later_key); - - // check that now and later exist - assert!(handle - .peer_cache - .get(&present_key) - .is_some_and(|entry| entry.value() == &present_pid)); - assert!(handle - .peer_cache - .get(&later_key) - .is_some_and(|entry| entry.value() == &later_pid)); - assert!(handle - .peer_cache_expiries - .read() - .await - .get(&now) - .is_some_and(|entry| entry == &present_key)); - assert!(handle - .peer_cache_expiries - .read() - .await - .get(&later) - .is_some_and(|entry| entry == &later_key)); - - // prune - handle.prune_peer_cache().await; - - // check that now doesn't exist and later does - assert!(handle.peer_cache.get(&present_key).is_none()); - assert!(handle - .peer_cache - .get(&later_key) - .is_some_and(|entry| entry.value() == &later_pid)); - assert!(handle.peer_cache_expiries.read().await.get(&now).is_none()); - assert!(handle - .peer_cache_expiries - .read() - .await - .get(&later) - .is_some_and(|entry| entry == &later_key)); - - // wait for later to expire - async_sleep(Duration::from_secs(1)).await; - - // prune - handle.prune_peer_cache().await; - - // check that later doesn't exist - assert!(handle.peer_cache.get(&later_key).is_none()); - assert!(handle - .peer_cache_expiries - .read() - .await - .get(&later) - .is_none()); - - // check that the expiries and cache are empty - assert!(handle.peer_cache_expiries.read().await.is_empty()); - assert!(handle.peer_cache.is_empty()); - } -} diff --git a/crates/libp2p-networking/tests/common/mod.rs b/crates/libp2p-networking/tests/common/mod.rs index 5a531bbafb..ca9e967319 100644 --- a/crates/libp2p-networking/tests/common/mod.rs +++ b/crates/libp2p-networking/tests/common/mod.rs @@ -39,8 +39,8 @@ pub async fn test_bed, FutG: Future> + 'static + Send + Sync, - F: FnOnce(Vec>>, Duration) -> FutF, - G: Fn(NetworkEvent, Arc>) -> FutG + 'static + Send + Sync, + F: FnOnce(Vec>>, Duration) -> FutF, + G: Fn(NetworkEvent, Arc>) -> FutG + 'static + Send + Sync, { setup_logging(); setup_backtrace(); @@ -69,7 +69,7 @@ pub async fn test_bed(handles: &[Arc>]) -> HashMap { +fn gen_peerid_map(handles: &[Arc>]) -> HashMap { let mut r_val = HashMap::new(); for handle in handles { r_val.insert(handle.peer_id(), handle.id()); @@ -79,7 +79,7 @@ fn gen_peerid_map(handles: &[Arc>]) -> HashMap(handles: &[Arc>]) { +pub async fn print_connections(handles: &[Arc>]) { let m = gen_peerid_map(handles); warn!("PRINTING CONNECTION STATES"); for handle in handles.iter() { @@ -104,7 +104,7 @@ pub async fn spin_up_swarms( num_of_nodes: usize, timeout_len: Duration, num_bootstrap: usize, -) -> Result>>, TestError> { +) -> Result>>, TestError> { let mut handles = Vec::new(); let mut bootstrap_addrs = Vec::<(PeerId, Multiaddr)>::new(); let mut connecting_futs = Vec::new(); diff --git a/crates/libp2p-networking/tests/counter.rs b/crates/libp2p-networking/tests/counter.rs index 85b7cb2a7f..eefbdcf37b 100644 --- a/crates/libp2p-networking/tests/counter.rs +++ b/crates/libp2p-networking/tests/counter.rs @@ -56,7 +56,7 @@ pub enum CounterMessage { #[instrument] pub async fn counter_handle_network_event( event: NetworkEvent, - handle: Arc>, + handle: Arc>, ) -> Result<(), NetworkNodeHandleError> { use CounterMessage::*; use NetworkEvent::*; @@ -121,8 +121,8 @@ pub async fn counter_handle_network_event( /// `requester_handle` asks for `requestee_handle`'s state, /// and then `requester_handle` updates its state to equal `requestee_handle`. async fn run_request_response_increment<'a>( - requester_handle: Arc>, - requestee_handle: Arc>, + requester_handle: Arc>, + requestee_handle: Arc>, timeout: Duration, ) -> Result<(), TestError> { async move { @@ -168,7 +168,7 @@ async fn run_request_response_increment<'a>( /// broadcasts `msg` from a randomly chosen handle /// then asserts that all nodes match `new_state` async fn run_gossip_round( - handles: &[Arc>], + handles: &[Arc>], msg: CounterMessage, new_state: CounterState, timeout_duration: Duration, @@ -234,7 +234,7 @@ async fn run_gossip_round( } async fn run_intersperse_many_rounds( - handles: Vec>>, + handles: Vec>>, timeout: Duration, ) { for i in 0..NUM_ROUNDS as u32 { @@ -250,21 +250,18 @@ async fn run_intersperse_many_rounds( } async fn run_dht_many_rounds( - handles: Vec>>, + handles: Vec>>, timeout: Duration, ) { run_dht_rounds(&handles, timeout, 0, NUM_ROUNDS).await; } -async fn run_dht_one_round( - handles: Vec>>, - timeout: Duration, -) { +async fn run_dht_one_round(handles: Vec>>, timeout: Duration) { run_dht_rounds(&handles, timeout, 0, 1).await; } async fn run_request_response_many_rounds( - handles: Vec>>, + handles: Vec>>, timeout: Duration, ) { for _i in 0..NUM_ROUNDS { @@ -276,7 +273,7 @@ async fn run_request_response_many_rounds( } pub async fn run_request_response_one_round( - handles: Vec>>, + handles: Vec>>, timeout: Duration, ) { run_request_response_increment_all(&handles, timeout).await; @@ -286,21 +283,21 @@ pub async fn run_request_response_one_round( } pub async fn run_gossip_many_rounds( - handles: Vec>>, + handles: Vec>>, timeout: Duration, ) { run_gossip_rounds(&handles, NUM_ROUNDS, 0, timeout).await } async fn run_gossip_one_round( - handles: Vec>>, + handles: Vec>>, timeout: Duration, ) { run_gossip_rounds(&handles, 1, 0, timeout).await } async fn run_dht_rounds( - handles: &[Arc>], + handles: &[Arc>], timeout: Duration, starting_val: usize, num_rounds: usize, @@ -336,7 +333,7 @@ async fn run_dht_rounds( /// runs `num_rounds` of message broadcast, incrementing the state of all nodes each broadcast async fn run_gossip_rounds( - handles: &[Arc>], + handles: &[Arc>], num_rounds: usize, starting_state: CounterState, timeout: Duration, @@ -361,7 +358,7 @@ async fn run_gossip_rounds( /// then has all other peers request its state /// and update their state to the recv'ed state async fn run_request_response_increment_all( - handles: &[Arc>], + handles: &[Arc>], timeout: Duration, ) { let mut rng = rand::thread_rng(); From 6ce6b2fb4f8af53df27126b12d8f2565a3fc3e83 Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 18 Sep 2023 17:39:27 -0400 Subject: [PATCH 2/5] satisfy clippy --- .../src/network/behaviours/dht/cache.rs | 26 ++++++++++++++----- flake.nix | 2 +- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/crates/libp2p-networking/src/network/behaviours/dht/cache.rs b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs index d87349f3d5..a669af869d 100644 --- a/crates/libp2p-networking/src/network/behaviours/dht/cache.rs +++ b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs @@ -36,6 +36,12 @@ pub enum CacheError { /// source of error source: Box, }, + + /// General cache error + GeneralCache { + /// source of error + source: Box, + }, } pub struct Config { @@ -115,8 +121,16 @@ impl Cache { let mut cache_to_write = HashMap::new(); let expiries = self.expiries.read().await; for (expiry, key) in &*expiries { - let entry = self.cache.get(key).unwrap(); - cache_to_write.insert(expiry, (key, entry.value().clone())); + if let Some(entry) = self.cache.get(key) { + cache_to_write.insert(expiry, (key, entry.value().clone())); + } else { + tracing::warn!("key not found in cache: {:?}", key); + Err(CacheError::GeneralCache { + source: Box::new(bincode::ErrorKind::Custom( + "key not found in cache".to_string(), + )), + })?; + }; } let encoded = bincode_opts() @@ -247,7 +261,7 @@ mod test { }); // add 10 key-value pairs to the cache - for i in 0 as u8..10 as u8 { + for i in 0u8..10u8 { let (key, value) = (vec![i; 1], vec![i + 1; 1]); cache.insert(key, value).await; } @@ -263,13 +277,13 @@ mod test { }); // check that the cache has the 10 key-value pairs - for i in 0 as u8..10 as u8 { + for i in 0u8..10u8 { let (key, value) = (vec![i; 1], vec![i + 1; 1]); assert_eq!(cache.get(&key).await.unwrap().value(), &value); } // delete the cache file - let _ = std::fs::remove_file("test.cache").unwrap(); + let _ = std::fs::remove_file("test.cache"); } #[cfg_attr( @@ -303,6 +317,6 @@ mod test { assert!(std::path::Path::new("test.cache").exists()); // delete the cache file - let _ = std::fs::remove_file("test.cache").unwrap(); + _ = std::fs::remove_file("test.cache"); } } diff --git a/flake.nix b/flake.nix index 2ff009c372..04cbf3c782 100644 --- a/flake.nix +++ b/flake.nix @@ -173,7 +173,7 @@ zlib.out fenix.packages.${system}.rust-analyzer just - pkgconfig + pkg-config openssl.dev openssl.out ] ++ lib.optionals stdenv.isDarwin [ From a0bed19228686977ffe6ed07279c534e2a20170a Mon Sep 17 00:00:00 2001 From: Rob Date: Tue, 19 Sep 2023 09:38:08 -0400 Subject: [PATCH 3/5] remove cache for tests (add default, builder to gen) --- .gitignore | 1 + .../src/traits/networking/libp2p_network.rs | 4 + .../src/network/behaviours/dht/cache.rs | 94 +++++++++---------- .../src/network/behaviours/dht/mod.rs | 8 +- crates/libp2p-networking/src/network/node.rs | 1 + .../src/network/node/config.rs | 5 + 6 files changed, 65 insertions(+), 48 deletions(-) diff --git a/.gitignore b/.gitignore index 839d85dc3d..ff07ca5941 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ /target_dirs /.vscode/settings.json **/.DS_Store +*.cache \ No newline at end of file diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index bcb933c7bf..fb2875aed4 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -198,6 +198,8 @@ where // setting to sane defaults .ttl(None) .republication_interval(None) + // this removes the cache for tests + .dht_cache_location(None) .build() .unwrap() } else { @@ -218,6 +220,8 @@ where // setting to sane defaults .ttl(None) .republication_interval(None) + // this removes the cache for tests + .dht_cache_location(None) .build() .unwrap() }; diff --git a/crates/libp2p-networking/src/network/behaviours/dht/cache.rs b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs index a669af869d..d1b4478bde 100644 --- a/crates/libp2p-networking/src/network/behaviours/dht/cache.rs +++ b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs @@ -44,22 +44,16 @@ pub enum CacheError { }, } +#[derive(Clone, derive_builder::Builder, custom_debug::Debug, Default)] pub struct Config { - pub filename: String, + #[builder(default = "Some(\"dht.cache\".to_string())")] + pub filename: Option, + #[builder(default = "Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC * 16)")] pub expiry: Duration, + #[builder(default = "4")] pub max_disk_parity_delta: u32, } -impl Default for Config { - fn default() -> Self { - Self { - filename: "dht.cache".to_string(), - expiry: Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC * 16), - max_disk_parity_delta: 4, - } - } -} - impl Default for Cache { fn default() -> Self { Self::new(Config::default()) @@ -97,18 +91,20 @@ impl Cache { } pub async fn load(&self) -> Result<(), CacheError> { - let encoded = std::fs::read(self.config.filename.clone()).context(DiskSnafu)?; - - let cache: HashMap, Vec)> = bincode_opts() - .deserialize(&encoded) - .context(DeserializationSnafu)?; - - // inline prune and insert - let now = SystemTime::now(); - for (expiry, (key, value)) in cache { - if now < expiry { - self.cache.insert(key.clone(), value); - self.expiries.write().await.insert(expiry, key); + if let Some(filename) = &self.config.filename { + let encoded = std::fs::read(filename).context(DiskSnafu)?; + + let cache: HashMap, Vec)> = bincode_opts() + .deserialize(&encoded) + .context(DeserializationSnafu)?; + + // inline prune and insert + let now = SystemTime::now(); + for (expiry, (key, value)) in cache { + if now < expiry { + self.cache.insert(key.clone(), value); + self.expiries.write().await.insert(expiry, key); + } } } @@ -116,28 +112,32 @@ impl Cache { } pub async fn save(&self) -> Result<(), CacheError> { - self.prune().await; - - let mut cache_to_write = HashMap::new(); - let expiries = self.expiries.read().await; - for (expiry, key) in &*expiries { - if let Some(entry) = self.cache.get(key) { - cache_to_write.insert(expiry, (key, entry.value().clone())); - } else { - tracing::warn!("key not found in cache: {:?}", key); - Err(CacheError::GeneralCache { - source: Box::new(bincode::ErrorKind::Custom( - "key not found in cache".to_string(), - )), - })?; - }; - } + if let Some(filename) = &self.config.filename { + // prune first + self.prune().await; + + // serialize + let mut cache_to_write = HashMap::new(); + let expiries = self.expiries.read().await; + for (expiry, key) in &*expiries { + if let Some(entry) = self.cache.get(key) { + cache_to_write.insert(expiry, (key, entry.value().clone())); + } else { + tracing::warn!("key not found in cache: {:?}", key); + Err(CacheError::GeneralCache { + source: Box::new(bincode::ErrorKind::Custom( + "key not found in cache".to_string(), + )), + })?; + }; + } - let encoded = bincode_opts() - .serialize(&cache_to_write) - .context(SerializationSnafu)?; + let encoded = bincode_opts() + .serialize(&cache_to_write) + .context(SerializationSnafu)?; - std::fs::write(self.config.filename.clone(), encoded).context(DiskSnafu)?; + std::fs::write(filename, encoded).context(DiskSnafu)?; + } Ok(()) } @@ -216,7 +216,7 @@ mod test { // cache with 1s eviction let cache = Cache::new(Config { - filename: "test.cache".to_string(), + filename: None, expiry: Duration::from_secs(1), max_disk_parity_delta: 4, }); @@ -255,7 +255,7 @@ mod test { let _ = std::fs::remove_file("test.cache"); let cache = Cache::new(Config { - filename: "test.cache".to_string(), + filename: Some("test.cache".to_string()), expiry: Duration::from_secs(600), max_disk_parity_delta: 4, }); @@ -271,7 +271,7 @@ mod test { // load the cache let cache = Cache::new(Config { - filename: "test.cache".to_string(), + filename: Some("test.cache".to_string()), expiry: Duration::from_secs(600), max_disk_parity_delta: 4, }); @@ -297,7 +297,7 @@ mod test { let cache = Cache::new(Config { // tests run sequentially, shouldn't matter - filename: "test.cache".to_string(), + filename: Some("test.cache".to_string()), expiry: Duration::from_secs(600), max_disk_parity_delta: 4, }); diff --git a/crates/libp2p-networking/src/network/behaviours/dht/mod.rs b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs index 68f15c9aae..5dd40f6248 100644 --- a/crates/libp2p-networking/src/network/behaviours/dht/mod.rs +++ b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs @@ -117,6 +117,7 @@ impl DHTBehaviour { mut kadem: Kademlia, pid: PeerId, replication_factor: NonZeroUsize, + cache_location: Option, ) -> Self { // needed because otherwise we stay in client mode when testing locally // and don't publish keys stuff @@ -145,7 +146,12 @@ impl DHTBehaviour { }, in_progress_get_closest_peers: HashMap::default(), replication_factor, - cache: Cache::default(), + cache: Cache::new( + cache::ConfigBuilder::default() + .filename(cache_location) + .build() + .unwrap_or_default(), + ), } } diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index 2f7dd46ba4..063cf6104f 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -269,6 +269,7 @@ impl NetworkNode { config .replication_factor .unwrap_or_else(|| NonZeroUsize::new(4).unwrap()), + config.dht_cache_location.clone(), ), identify, DMBehaviour::new(request_response), diff --git a/crates/libp2p-networking/src/network/node/config.rs b/crates/libp2p-networking/src/network/node/config.rs index 4d19b6f516..82897c0de5 100644 --- a/crates/libp2p-networking/src/network/node/config.rs +++ b/crates/libp2p-networking/src/network/node/config.rs @@ -24,6 +24,11 @@ pub struct NetworkNodeConfig { #[builder(setter(into, strip_option), default = "DEFAULT_REPLICATION_FACTOR")] pub replication_factor: Option, + /// location of the dht cache + /// default is "dht.cache" in the current directory + #[builder(default = "Some(\"dht.cache\".to_string())")] + pub dht_cache_location: Option, + #[builder(default)] /// parameters for gossipsub mesh network pub mesh_params: Option, From e6838718433f9feb54e3acef88b14cb987207c1a Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 25 Sep 2023 09:27:03 -0400 Subject: [PATCH 4/5] pr changes --- .../src/traits/networking/libp2p_network.rs | 2 +- .../src/network/behaviours/dht/cache.rs | 20 +++++++++++-------- .../src/network/behaviours/dht/mod.rs | 5 +++-- crates/libp2p-networking/src/network/node.rs | 3 ++- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index fb2875aed4..ebb68e7d01 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -289,7 +289,7 @@ impl Libp2pNetwork { ) -> Result, NetworkError> { assert!(bootstrap_addrs_len > 4, "Need at least 5 bootstrap nodes"); let network_handle = Arc::new( - NetworkNodeHandle::<()>::new(config, id) + Box::pin(NetworkNodeHandle::<()>::new(config, id)) .await .map_err(Into::::into)?, ); diff --git a/crates/libp2p-networking/src/network/behaviours/dht/cache.rs b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs index d1b4478bde..1703469c68 100644 --- a/crates/libp2p-networking/src/network/behaviours/dht/cache.rs +++ b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs @@ -56,7 +56,7 @@ pub struct Config { impl Default for Cache { fn default() -> Self { - Self::new(Config::default()) + async_block_on(Self::new(Config::default())) } } @@ -74,7 +74,7 @@ pub struct Cache { } impl Cache { - pub fn new(config: Config) -> Self { + pub async fn new(config: Config) -> Self { let cache = Self { cache: Arc::new(DashMap::new()), expiries: Arc::new(RwLock::new(BTreeMap::new())), @@ -83,7 +83,7 @@ impl Cache { }; // try loading from file - if let Err(err) = async_block_on(cache.load()) { + if let Err(err) = cache.load().await { tracing::warn!("failed to load cache from file: {}", err); }; @@ -189,7 +189,7 @@ impl Cache { let cur_disk_parity_delta = self.disk_parity_delta.load(Ordering::Relaxed); if cur_disk_parity_delta >= self.config.max_disk_parity_delta { if let Err(err) = self.save().await { - tracing::warn!("failed to save cache to file: {}", err); + tracing::error!("failed to save cache to file: {}", err); }; } } @@ -219,7 +219,8 @@ mod test { filename: None, expiry: Duration::from_secs(1), max_disk_parity_delta: 4, - }); + }) + .await; let (key, value) = (PeerId::random(), PeerId::random()); @@ -258,7 +259,8 @@ mod test { filename: Some("test.cache".to_string()), expiry: Duration::from_secs(600), max_disk_parity_delta: 4, - }); + }) + .await; // add 10 key-value pairs to the cache for i in 0u8..10u8 { @@ -274,7 +276,8 @@ mod test { filename: Some("test.cache".to_string()), expiry: Duration::from_secs(600), max_disk_parity_delta: 4, - }); + }) + .await; // check that the cache has the 10 key-value pairs for i in 0u8..10u8 { @@ -300,7 +303,8 @@ mod test { filename: Some("test.cache".to_string()), expiry: Duration::from_secs(600), max_disk_parity_delta: 4, - }); + }) + .await; // insert into cache for i in 0..3 { diff --git a/crates/libp2p-networking/src/network/behaviours/dht/mod.rs b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs index 5dd40f6248..7086b6dab1 100644 --- a/crates/libp2p-networking/src/network/behaviours/dht/mod.rs +++ b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs @@ -113,7 +113,7 @@ impl DHTBehaviour { /// Create a new DHT behaviour #[must_use] - pub fn new( + pub async fn new( mut kadem: Kademlia, pid: PeerId, replication_factor: NonZeroUsize, @@ -151,7 +151,8 @@ impl DHTBehaviour { .filename(cache_location) .build() .unwrap_or_default(), - ), + ) + .await, } } diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index 063cf6104f..835058bf84 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -270,7 +270,8 @@ impl NetworkNode { .replication_factor .unwrap_or_else(|| NonZeroUsize::new(4).unwrap()), config.dht_cache_location.clone(), - ), + ) + .await, identify, DMBehaviour::new(request_response), ); From aa1b570de9d7c2ee030e66396bc050793c9985e9 Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 25 Sep 2023 09:28:54 -0400 Subject: [PATCH 5/5] remove comment --- crates/libp2p-networking/src/network/behaviours/dht/cache.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/libp2p-networking/src/network/behaviours/dht/cache.rs b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs index 1703469c68..602bb41e16 100644 --- a/crates/libp2p-networking/src/network/behaviours/dht/cache.rs +++ b/crates/libp2p-networking/src/network/behaviours/dht/cache.rs @@ -157,7 +157,6 @@ impl Cache { } } - // todo: test speed on this as opposed to inline if removed > 0 { self.disk_parity_delta.fetch_add(removed, Ordering::Relaxed); }