diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index fcc93827e4..220858e99f 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{PublicKey, Record, SectorIndex}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer::piece_cache::PieceCache; +use subspace_farmer::farmer_cache::FarmerCache; use subspace_farmer::single_disk_farm::farming::FarmingNotification; use subspace_farmer::single_disk_farm::{ SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, SingleDiskFarm, @@ -394,7 +394,7 @@ where let keypair = derive_libp2p_keypair(identity.secret_key()); let peer_id = keypair.public().to_peer_id(); - let (piece_cache, piece_cache_worker) = PieceCache::new(node_client.clone(), peer_id); + let (farmer_cache, farmer_cache_worker) = FarmerCache::new(node_client.clone(), peer_id); // Metrics let mut prometheus_metrics_registry = Registry::default(); @@ -413,7 +413,7 @@ where dsn, Arc::downgrade(&plotted_pieces), node_client.clone(), - piece_cache.clone(), + farmer_cache.clone(), should_start_prometheus_server.then_some(&mut prometheus_metrics_registry), )? }; @@ -443,20 +443,20 @@ where )); let piece_provider = PieceProvider::new(node.clone(), validator.clone()); - let piece_getter = Arc::new(FarmerPieceGetter::new( + let piece_getter = FarmerPieceGetter::new( piece_provider, - piece_cache.clone(), + farmer_cache.clone(), node_client.clone(), Arc::clone(&plotted_pieces), - )); + ); - let piece_cache_worker_fut = run_future_in_dedicated_thread( + let farmer_cache_worker_fut = run_future_in_dedicated_thread( { - let future = piece_cache_worker.run(piece_getter.clone()); + let future = farmer_cache_worker.run(piece_getter.downgrade()); move || future }, - "cache-worker".to_string(), + "farmer-cache-worker".to_string(), )?; let mut single_disk_farms = Vec::with_capacity(disk_farms.len()); @@ -626,7 +626,7 @@ where single_disk_farms.push(single_disk_farm); } - let cache_acknowledgement_receiver = piece_cache + let cache_acknowledgement_receiver = farmer_cache .replace_backing_caches( single_disk_farms .iter() @@ -634,7 +634,7 @@ where .collect(), ) .await; - drop(piece_cache); + drop(farmer_cache); // Wait for cache initialization before starting plotting tokio::spawn(async move { @@ -858,11 +858,11 @@ where // This defines order in which things are dropped let networking_fut = networking_fut; let farm_fut = farm_fut; - let piece_cache_worker_fut = piece_cache_worker_fut; + let farmer_cache_worker_fut = farmer_cache_worker_fut; let networking_fut = pin!(networking_fut); let farm_fut = pin!(farm_fut); - let piece_cache_worker_fut = pin!(piece_cache_worker_fut); + let farmer_cache_worker_fut = pin!(farmer_cache_worker_fut); futures::select!( // Signal future @@ -879,8 +879,8 @@ where }, // Piece cache worker future - _ = piece_cache_worker_fut.fuse() => { - info!("Piece cache worker exited.") + _ = farmer_cache_worker_fut.fuse() => { + info!("Farmer cache worker exited.") }, ); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index 241493fa6a..eb94cbe32a 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -4,8 +4,8 @@ use prometheus_client::registry::Registry; use std::collections::HashSet; use std::path::Path; use std::sync::{Arc, Weak}; +use subspace_farmer::farmer_cache::FarmerCache; use subspace_farmer::node_client::NodeClientExt; -use subspace_farmer::piece_cache::PieceCache; use subspace_farmer::utils::plotted_pieces::PlottedPieces; use subspace_farmer::{NodeClient, NodeRpcClient, KNOWN_PEERS_CACHE_SIZE}; use subspace_networking::libp2p::identity::Keypair; @@ -45,9 +45,9 @@ pub(super) fn configure_dsn( }: DsnArgs, weak_plotted_pieces: Weak>>, node_client: NodeRpcClient, - piece_cache: PieceCache, + farmer_cache: FarmerCache, prometheus_metrics_registry: Option<&mut Registry>, -) -> Result<(Node, NodeRunner), anyhow::Error> { +) -> Result<(Node, NodeRunner), anyhow::Error> { let networking_parameters_registry = KnownPeersManager::new(KnownPeersManagerConfig { path: Some(base_path.join("known_addresses.bin").into_boxed_path()), ignore_peer_list: strip_peer_id(bootstrap_nodes.clone()) @@ -62,7 +62,7 @@ pub(super) fn configure_dsn( let default_config = Config::new( protocol_prefix, keypair, - piece_cache.clone(), + farmer_cache.clone(), prometheus_metrics_registry, ); let config = Config { @@ -75,11 +75,11 @@ pub(super) fn configure_dsn( debug!(?piece_index, "Piece request received. Trying cache..."); let weak_plotted_pieces = weak_plotted_pieces.clone(); - let piece_cache = piece_cache.clone(); + let farmer_cache = farmer_cache.clone(); async move { let key = RecordKey::from(piece_index.to_multihash()); - let piece_from_cache = piece_cache.get_piece(key).await; + let piece_from_cache = farmer_cache.get_piece(key).await; if let Some(piece) = piece_from_cache { Some(PieceByIndexResponse { piece: Some(piece) }) diff --git a/crates/subspace-farmer/src/piece_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs similarity index 97% rename from crates/subspace-farmer/src/piece_cache.rs rename to crates/subspace-farmer/src/farmer_cache.rs index 0dc5e11574..2c053c4045 100644 --- a/crates/subspace-farmer/src/piece_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -65,10 +65,10 @@ struct CacheWorkerState { last_segment_index: SegmentIndex, } -/// Cache worker used to drive the cache +/// Farmer cache worker used to drive the farmer cache backend #[derive(Debug)] -#[must_use = "Cache will not work unless its worker is running"] -pub struct CacheWorker +#[must_use = "Farmer cache will not work unless its worker is running"] +pub struct FarmerCacheWorker where NC: fmt::Debug, { @@ -79,11 +79,13 @@ where worker_receiver: Option>, } -impl CacheWorker +impl FarmerCacheWorker where NC: NodeClient, { - /// Run the cache worker with provided piece getter + /// Run the cache worker with provided piece getter. + /// + /// NOTE: Piece getter must not depend on farmer cache in order to avoid reference cycles! pub async fn run(mut self, piece_getter: PG) where PG: PieceGetter, @@ -754,23 +756,23 @@ where } } -/// Piece cache that aggregates caches of multiple disks +/// Farmer cache that aggregates different kinds of caches of multiple disks #[derive(Debug, Clone)] -pub struct PieceCache { +pub struct FarmerCache { peer_id: PeerId, /// Individual disk caches where pieces are stored caches: Arc>>, handlers: Arc, // We do not want to increase capacity unnecessarily on clone - worker_sender: mpsc::Sender, + worker_sender: Arc>, } -impl PieceCache { +impl FarmerCache { /// Create new piece cache instance and corresponding worker. /// /// NOTE: Returned future is async, but does blocking operations and should be running in /// dedicated thread. - pub fn new(node_client: NC, peer_id: PeerId) -> (Self, CacheWorker) + pub fn new(node_client: NC, peer_id: PeerId) -> (Self, FarmerCacheWorker) where NC: NodeClient, { @@ -782,9 +784,9 @@ impl PieceCache { peer_id, caches: Arc::clone(&caches), handlers: Arc::clone(&handlers), - worker_sender, + worker_sender: Arc::new(worker_sender), }; - let worker = CacheWorker { + let worker = FarmerCacheWorker { peer_id, node_client, caches, @@ -797,11 +799,10 @@ impl PieceCache { /// Get piece from cache pub async fn get_piece(&self, key: RecordKey) -> Option { - let caches = Arc::clone(&self.caches); - let maybe_piece_fut = tokio::task::spawn_blocking({ let key = key.clone(); - let worker_sender = self.worker_sender.clone(); + let caches = Arc::clone(&self.caches); + let worker_sender = Arc::clone(&self.worker_sender); move || { for (disk_farm_index, cache) in caches.read().iter().enumerate() { @@ -872,7 +873,7 @@ impl PieceCache { } } -impl LocalRecordProvider for PieceCache { +impl LocalRecordProvider for FarmerCache { fn record(&self, key: &RecordKey) -> Option { // It is okay to take read lock here, writes locks are very infrequent and very short for cache in self.caches.read().iter() { diff --git a/crates/subspace-farmer/src/piece_cache/tests.rs b/crates/subspace-farmer/src/farmer_cache/tests.rs similarity index 93% rename from crates/subspace-farmer/src/piece_cache/tests.rs rename to crates/subspace-farmer/src/farmer_cache/tests.rs index 8166a8a15f..9c44f91dbf 100644 --- a/crates/subspace-farmer/src/piece_cache/tests.rs +++ b/crates/subspace-farmer/src/farmer_cache/tests.rs @@ -1,5 +1,5 @@ +use crate::farmer_cache::FarmerCache; use crate::node_client::Error; -use crate::piece_cache::PieceCache; use crate::single_disk_farm::piece_cache::DiskPieceCache; use crate::NodeClient; use futures::channel::{mpsc, oneshot}; @@ -185,12 +185,13 @@ async fn basic() { let path2 = tempdir().unwrap(); { - let (piece_cache, piece_cache_worker) = - PieceCache::new(node_client.clone(), public_key.to_peer_id()); + let (farmer_cache, farmer_cache_worker) = + FarmerCache::new(node_client.clone(), public_key.to_peer_id()); - let piece_cache_worker_exited = tokio::spawn(piece_cache_worker.run(piece_getter.clone())); + let farmer_cache_worker_exited = + tokio::spawn(farmer_cache_worker.run(piece_getter.clone())); - let initialized_fut = piece_cache + let initialized_fut = farmer_cache .replace_backing_caches(vec![ DiskPieceCache::open(path1.as_ref(), 1).unwrap(), DiskPieceCache::open(path2.as_ref(), 1).unwrap(), @@ -208,14 +209,14 @@ async fn basic() { assert_eq!(requested_pieces, expected_pieces); for piece_index in requested_pieces { - piece_cache + farmer_cache .get_piece(RecordKey::from(piece_index.to_multihash())) .await .unwrap(); } // Other piece indices are not requested or cached - assert!(piece_cache + assert!(farmer_cache .get_piece(RecordKey::from(PieceIndex::from(10).to_multihash())) .await .is_none()); @@ -275,7 +276,7 @@ async fn basic() { let stored_pieces = vec![PieceIndex::from(196), PieceIndex::from(276)]; for piece_index in &stored_pieces { - piece_cache + farmer_cache .get_piece(RecordKey::from(piece_index.to_multihash())) .await .unwrap(); @@ -284,7 +285,7 @@ async fn basic() { for piece_index in requested_pieces { if !stored_pieces.contains(&piece_index) { // Other piece indices are not stored anymore - assert!(piece_cache + assert!(farmer_cache .get_piece(RecordKey::from(PieceIndex::from(10).to_multihash())) .await .is_none()); @@ -341,7 +342,7 @@ async fn basic() { let stored_pieces = vec![PieceIndex::from(823), PieceIndex::from(859)]; for piece_index in &stored_pieces { - piece_cache + farmer_cache .get_piece(RecordKey::from(piece_index.to_multihash())) .await .unwrap(); @@ -350,7 +351,7 @@ async fn basic() { for piece_index in requested_pieces { if !stored_pieces.contains(&piece_index) { // Other piece indices are not stored anymore - assert!(piece_cache + assert!(farmer_cache .get_piece(RecordKey::from(PieceIndex::from(10).to_multihash())) .await .is_none()); @@ -358,28 +359,28 @@ async fn basic() { } } - drop(piece_cache); + drop(farmer_cache); - piece_cache_worker_exited.await.unwrap(); + farmer_cache_worker_exited.await.unwrap(); } { // Clear requested pieces pieces.lock().clear(); - let (piece_cache, piece_cache_worker) = - PieceCache::new(node_client.clone(), public_key.to_peer_id()); + let (farmer_cache, farmer_cache_worker) = + FarmerCache::new(node_client.clone(), public_key.to_peer_id()); - let piece_cache_worker_exited = tokio::spawn(piece_cache_worker.run(piece_getter)); + let farmer_cache_worker_exited = tokio::spawn(farmer_cache_worker.run(piece_getter)); // Reopen with the same backing caches - let initialized_fut = piece_cache + let initialized_fut = farmer_cache .replace_backing_caches(vec![ DiskPieceCache::open(path1.as_ref(), 1).unwrap(), DiskPieceCache::open(path2.as_ref(), 1).unwrap(), ]) .await; - drop(piece_cache); + drop(farmer_cache); // Wait for piece cache to be initialized initialized_fut.await.unwrap(); @@ -398,6 +399,6 @@ async fn basic() { // Make worker exit archived_segment_headers_sender.close().await.unwrap(); - piece_cache_worker_exited.await.unwrap(); + farmer_cache_worker_exited.await.unwrap(); } } diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index bd33951aaf..c7a222b9ef 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -36,9 +36,9 @@ //! are `target ± ½ * solution range` (while also handing overflow/underflow) when interpreted as //! 64-bit unsigned integers. +pub mod farmer_cache; pub(crate) mod identity; pub mod node_client; -pub mod piece_cache; pub mod reward_signing; pub mod single_disk_farm; pub mod thread_pool_manager; diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs index 45d763f770..408855260c 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs @@ -48,7 +48,8 @@ struct Inner { num_elements: usize, } -/// Piece cache stored on one disk +/// Dedicated piece cache stored on one disk, is used both to accelerate DSN queries and to plot +/// faster #[derive(Debug, Clone)] pub struct DiskPieceCache { inner: Arc, @@ -57,17 +58,7 @@ pub struct DiskPieceCache { impl DiskPieceCache { pub(super) const FILE_NAME: &'static str = "piece_cache.bin"; - #[cfg(not(test))] - pub(super) fn open(directory: &Path, capacity: usize) -> Result { - Self::open_internal(directory, capacity) - } - - #[cfg(test)] - pub(crate) fn open(directory: &Path, capacity: usize) -> Result { - Self::open_internal(directory, capacity) - } - - pub(super) fn open_internal( + pub(in super::super) fn open( directory: &Path, capacity: usize, ) -> Result { diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index 7ab5d2b486..552e306504 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -1,10 +1,10 @@ -use crate::piece_cache::PieceCache; +use crate::farmer_cache::FarmerCache; use crate::utils::plotted_pieces::PlottedPieces; use crate::NodeClient; use async_trait::async_trait; use parking_lot::Mutex; use std::error::Error; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use subspace_core_primitives::{Piece, PieceIndex}; use subspace_farmer_components::{PieceGetter, PieceGetterRetryPolicy}; use subspace_networking::libp2p::kad::RecordKey; @@ -14,25 +14,47 @@ use tracing::{debug, error, trace}; const MAX_RANDOM_WALK_ROUNDS: usize = 15; -pub struct FarmerPieceGetter { +struct Inner { piece_provider: PieceProvider, - piece_cache: PieceCache, + farmer_cache: FarmerCache, node_client: NC, plotted_pieces: Arc>>, } +pub struct FarmerPieceGetter { + inner: Arc>, +} + +impl Clone for FarmerPieceGetter { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + impl FarmerPieceGetter { pub fn new( piece_provider: PieceProvider, - piece_cache: PieceCache, + farmer_cache: FarmerCache, node_client: NC, plotted_pieces: Arc>>, ) -> Self { Self { - piece_provider, - piece_cache, - node_client, - plotted_pieces, + inner: Arc::new(Inner { + piece_provider, + farmer_cache, + node_client, + plotted_pieces, + }), + } + } + + /// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally + /// used [`Arc`] + pub fn downgrade(&self) -> WeakFarmerPieceGetter { + WeakFarmerPieceGetter { + inner: Arc::downgrade(&self.inner), } } @@ -57,15 +79,17 @@ where ) -> Result, Box> { let key = RecordKey::from(piece_index.to_multihash()); - trace!(%piece_index, "Getting piece from local cache"); - if let Some(piece) = self.piece_cache.get_piece(key).await { - trace!(%piece_index, "Got piece from local cache successfully"); + let inner = &self.inner; + + trace!(%piece_index, "Getting piece from farmer cache"); + if let Some(piece) = inner.farmer_cache.get_piece(key).await { + trace!(%piece_index, "Got piece from farmer cache successfully"); return Ok(Some(piece)); } // L2 piece acquisition trace!(%piece_index, "Getting piece from DSN L2 cache"); - let maybe_piece = self + let maybe_piece = inner .piece_provider .get_piece_from_dsn_cache(piece_index, Self::convert_retry_policy(retry_policy)) .await?; @@ -77,7 +101,7 @@ where // Try node's RPC before reaching to L1 (archival storage on DSN) trace!(%piece_index, "Getting piece from node"); - match self.node_client.piece(piece_index).await { + match inner.node_client.piece(piece_index).await { Ok(Some(piece)) => { trace!(%piece_index, "Got piece from node successfully"); return Ok(Some(piece)); @@ -95,7 +119,7 @@ where } trace!(%piece_index, "Getting piece from local plot"); - let maybe_read_piece_fut = self + let maybe_read_piece_fut = inner .plotted_pieces .lock() .as_ref() @@ -111,7 +135,7 @@ where // L1 piece acquisition trace!(%piece_index, "Getting piece from DSN L1."); - let archival_storage_search_result = self + let archival_storage_search_result = inner .piece_provider .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS) .await; @@ -129,3 +153,46 @@ where Ok(None) } } + +/// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`] +#[derive(Debug)] +pub struct WeakFarmerPieceGetter { + inner: Weak>, +} + +impl Clone for WeakFarmerPieceGetter { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +#[async_trait] +impl PieceGetter for WeakFarmerPieceGetter +where + PV: PieceValidator + Send + 'static, + NC: NodeClient, +{ + async fn get_piece( + &self, + piece_index: PieceIndex, + retry_policy: PieceGetterRetryPolicy, + ) -> Result, Box> { + let Some(piece_getter) = self.upgrade() else { + debug!("Farmer piece getter upgrade didn't succeed"); + return Ok(None); + }; + + piece_getter.get_piece(piece_index, retry_policy).await + } +} + +impl WeakFarmerPieceGetter { + /// Try to upgrade to [`FarmerPieceGetter`] if there is at least one other instance of it alive + pub fn upgrade(&self) -> Option> { + Some(FarmerPieceGetter { + inner: self.inner.upgrade()?, + }) + } +}