Skip to content

Commit

Permalink
Merge pull request #2534 from subspace/farmer-refactoring
Browse files Browse the repository at this point in the history
Farmer refactoring
  • Loading branch information
nazar-pc authored Feb 19, 2024
2 parents c52d660 + 558654f commit c9b4300
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 85 deletions.
30 changes: 15 additions & 15 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{PublicKey, Record, SectorIndex};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::piece_cache::PieceCache;
use subspace_farmer::farmer_cache::FarmerCache;
use subspace_farmer::single_disk_farm::farming::FarmingNotification;
use subspace_farmer::single_disk_farm::{
SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, SingleDiskFarm,
Expand Down Expand Up @@ -394,7 +394,7 @@ where
let keypair = derive_libp2p_keypair(identity.secret_key());
let peer_id = keypair.public().to_peer_id();

let (piece_cache, piece_cache_worker) = PieceCache::new(node_client.clone(), peer_id);
let (farmer_cache, farmer_cache_worker) = FarmerCache::new(node_client.clone(), peer_id);

// Metrics
let mut prometheus_metrics_registry = Registry::default();
Expand All @@ -413,7 +413,7 @@ where
dsn,
Arc::downgrade(&plotted_pieces),
node_client.clone(),
piece_cache.clone(),
farmer_cache.clone(),
should_start_prometheus_server.then_some(&mut prometheus_metrics_registry),
)?
};
Expand Down Expand Up @@ -443,20 +443,20 @@ where
));
let piece_provider = PieceProvider::new(node.clone(), validator.clone());

let piece_getter = Arc::new(FarmerPieceGetter::new(
let piece_getter = FarmerPieceGetter::new(
piece_provider,
piece_cache.clone(),
farmer_cache.clone(),
node_client.clone(),
Arc::clone(&plotted_pieces),
));
);

let piece_cache_worker_fut = run_future_in_dedicated_thread(
let farmer_cache_worker_fut = run_future_in_dedicated_thread(
{
let future = piece_cache_worker.run(piece_getter.clone());
let future = farmer_cache_worker.run(piece_getter.downgrade());

move || future
},
"cache-worker".to_string(),
"farmer-cache-worker".to_string(),
)?;

let mut single_disk_farms = Vec::with_capacity(disk_farms.len());
Expand Down Expand Up @@ -626,15 +626,15 @@ where
single_disk_farms.push(single_disk_farm);
}

let cache_acknowledgement_receiver = piece_cache
let cache_acknowledgement_receiver = farmer_cache
.replace_backing_caches(
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.piece_cache())
.collect(),
)
.await;
drop(piece_cache);
drop(farmer_cache);

// Wait for cache initialization before starting plotting
tokio::spawn(async move {
Expand Down Expand Up @@ -858,11 +858,11 @@ where
// This defines order in which things are dropped
let networking_fut = networking_fut;
let farm_fut = farm_fut;
let piece_cache_worker_fut = piece_cache_worker_fut;
let farmer_cache_worker_fut = farmer_cache_worker_fut;

let networking_fut = pin!(networking_fut);
let farm_fut = pin!(farm_fut);
let piece_cache_worker_fut = pin!(piece_cache_worker_fut);
let farmer_cache_worker_fut = pin!(farmer_cache_worker_fut);

futures::select!(
// Signal future
Expand All @@ -879,8 +879,8 @@ where
},

// Piece cache worker future
_ = piece_cache_worker_fut.fuse() => {
info!("Piece cache worker exited.")
_ = farmer_cache_worker_fut.fuse() => {
info!("Farmer cache worker exited.")
},
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use prometheus_client::registry::Registry;
use std::collections::HashSet;
use std::path::Path;
use std::sync::{Arc, Weak};
use subspace_farmer::farmer_cache::FarmerCache;
use subspace_farmer::node_client::NodeClientExt;
use subspace_farmer::piece_cache::PieceCache;
use subspace_farmer::utils::plotted_pieces::PlottedPieces;
use subspace_farmer::{NodeClient, NodeRpcClient, KNOWN_PEERS_CACHE_SIZE};
use subspace_networking::libp2p::identity::Keypair;
Expand Down Expand Up @@ -45,9 +45,9 @@ pub(super) fn configure_dsn(
}: DsnArgs,
weak_plotted_pieces: Weak<Mutex<Option<PlottedPieces>>>,
node_client: NodeRpcClient,
piece_cache: PieceCache,
farmer_cache: FarmerCache,
prometheus_metrics_registry: Option<&mut Registry>,
) -> Result<(Node, NodeRunner<PieceCache>), anyhow::Error> {
) -> Result<(Node, NodeRunner<FarmerCache>), anyhow::Error> {
let networking_parameters_registry = KnownPeersManager::new(KnownPeersManagerConfig {
path: Some(base_path.join("known_addresses.bin").into_boxed_path()),
ignore_peer_list: strip_peer_id(bootstrap_nodes.clone())
Expand All @@ -62,7 +62,7 @@ pub(super) fn configure_dsn(
let default_config = Config::new(
protocol_prefix,
keypair,
piece_cache.clone(),
farmer_cache.clone(),
prometheus_metrics_registry,
);
let config = Config {
Expand All @@ -75,11 +75,11 @@ pub(super) fn configure_dsn(
debug!(?piece_index, "Piece request received. Trying cache...");

let weak_plotted_pieces = weak_plotted_pieces.clone();
let piece_cache = piece_cache.clone();
let farmer_cache = farmer_cache.clone();

async move {
let key = RecordKey::from(piece_index.to_multihash());
let piece_from_cache = piece_cache.get_piece(key).await;
let piece_from_cache = farmer_cache.get_piece(key).await;

if let Some(piece) = piece_from_cache {
Some(PieceByIndexResponse { piece: Some(piece) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ struct CacheWorkerState {
last_segment_index: SegmentIndex,
}

/// Cache worker used to drive the cache
/// Farmer cache worker used to drive the farmer cache backend
#[derive(Debug)]
#[must_use = "Cache will not work unless its worker is running"]
pub struct CacheWorker<NC>
#[must_use = "Farmer cache will not work unless its worker is running"]
pub struct FarmerCacheWorker<NC>
where
NC: fmt::Debug,
{
Expand All @@ -79,11 +79,13 @@ where
worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
}

impl<NC> CacheWorker<NC>
impl<NC> FarmerCacheWorker<NC>
where
NC: NodeClient,
{
/// Run the cache worker with provided piece getter
/// Run the cache worker with provided piece getter.
///
/// NOTE: Piece getter must not depend on farmer cache in order to avoid reference cycles!
pub async fn run<PG>(mut self, piece_getter: PG)
where
PG: PieceGetter,
Expand Down Expand Up @@ -754,23 +756,23 @@ where
}
}

/// Piece cache that aggregates caches of multiple disks
/// Farmer cache that aggregates different kinds of caches of multiple disks
#[derive(Debug, Clone)]
pub struct PieceCache {
pub struct FarmerCache {
peer_id: PeerId,
/// Individual disk caches where pieces are stored
caches: Arc<RwLock<Vec<DiskPieceCacheState>>>,
handlers: Arc<Handlers>,
// We do not want to increase capacity unnecessarily on clone
worker_sender: mpsc::Sender<WorkerCommand>,
worker_sender: Arc<mpsc::Sender<WorkerCommand>>,
}

impl PieceCache {
impl FarmerCache {
/// Create new piece cache instance and corresponding worker.
///
/// NOTE: Returned future is async, but does blocking operations and should be running in
/// dedicated thread.
pub fn new<NC>(node_client: NC, peer_id: PeerId) -> (Self, CacheWorker<NC>)
pub fn new<NC>(node_client: NC, peer_id: PeerId) -> (Self, FarmerCacheWorker<NC>)
where
NC: NodeClient,
{
Expand All @@ -782,9 +784,9 @@ impl PieceCache {
peer_id,
caches: Arc::clone(&caches),
handlers: Arc::clone(&handlers),
worker_sender,
worker_sender: Arc::new(worker_sender),
};
let worker = CacheWorker {
let worker = FarmerCacheWorker {
peer_id,
node_client,
caches,
Expand All @@ -797,11 +799,10 @@ impl PieceCache {

/// Get piece from cache
pub async fn get_piece(&self, key: RecordKey) -> Option<Piece> {
let caches = Arc::clone(&self.caches);

let maybe_piece_fut = tokio::task::spawn_blocking({
let key = key.clone();
let worker_sender = self.worker_sender.clone();
let caches = Arc::clone(&self.caches);
let worker_sender = Arc::clone(&self.worker_sender);

move || {
for (disk_farm_index, cache) in caches.read().iter().enumerate() {
Expand Down Expand Up @@ -872,7 +873,7 @@ impl PieceCache {
}
}

impl LocalRecordProvider for PieceCache {
impl LocalRecordProvider for FarmerCache {
fn record(&self, key: &RecordKey) -> Option<ProviderRecord> {
// It is okay to take read lock here, writes locks are very infrequent and very short
for cache in self.caches.read().iter() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::farmer_cache::FarmerCache;
use crate::node_client::Error;
use crate::piece_cache::PieceCache;
use crate::single_disk_farm::piece_cache::DiskPieceCache;
use crate::NodeClient;
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -185,12 +185,13 @@ async fn basic() {
let path2 = tempdir().unwrap();

{
let (piece_cache, piece_cache_worker) =
PieceCache::new(node_client.clone(), public_key.to_peer_id());
let (farmer_cache, farmer_cache_worker) =
FarmerCache::new(node_client.clone(), public_key.to_peer_id());

let piece_cache_worker_exited = tokio::spawn(piece_cache_worker.run(piece_getter.clone()));
let farmer_cache_worker_exited =
tokio::spawn(farmer_cache_worker.run(piece_getter.clone()));

let initialized_fut = piece_cache
let initialized_fut = farmer_cache
.replace_backing_caches(vec![
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
Expand All @@ -208,14 +209,14 @@ async fn basic() {
assert_eq!(requested_pieces, expected_pieces);

for piece_index in requested_pieces {
piece_cache
farmer_cache
.get_piece(RecordKey::from(piece_index.to_multihash()))
.await
.unwrap();
}

// Other piece indices are not requested or cached
assert!(piece_cache
assert!(farmer_cache
.get_piece(RecordKey::from(PieceIndex::from(10).to_multihash()))
.await
.is_none());
Expand Down Expand Up @@ -275,7 +276,7 @@ async fn basic() {

let stored_pieces = vec![PieceIndex::from(196), PieceIndex::from(276)];
for piece_index in &stored_pieces {
piece_cache
farmer_cache
.get_piece(RecordKey::from(piece_index.to_multihash()))
.await
.unwrap();
Expand All @@ -284,7 +285,7 @@ async fn basic() {
for piece_index in requested_pieces {
if !stored_pieces.contains(&piece_index) {
// Other piece indices are not stored anymore
assert!(piece_cache
assert!(farmer_cache
.get_piece(RecordKey::from(PieceIndex::from(10).to_multihash()))
.await
.is_none());
Expand Down Expand Up @@ -341,7 +342,7 @@ async fn basic() {

let stored_pieces = vec![PieceIndex::from(823), PieceIndex::from(859)];
for piece_index in &stored_pieces {
piece_cache
farmer_cache
.get_piece(RecordKey::from(piece_index.to_multihash()))
.await
.unwrap();
Expand All @@ -350,36 +351,36 @@ async fn basic() {
for piece_index in requested_pieces {
if !stored_pieces.contains(&piece_index) {
// Other piece indices are not stored anymore
assert!(piece_cache
assert!(farmer_cache
.get_piece(RecordKey::from(PieceIndex::from(10).to_multihash()))
.await
.is_none());
}
}
}

drop(piece_cache);
drop(farmer_cache);

piece_cache_worker_exited.await.unwrap();
farmer_cache_worker_exited.await.unwrap();
}

{
// Clear requested pieces
pieces.lock().clear();

let (piece_cache, piece_cache_worker) =
PieceCache::new(node_client.clone(), public_key.to_peer_id());
let (farmer_cache, farmer_cache_worker) =
FarmerCache::new(node_client.clone(), public_key.to_peer_id());

let piece_cache_worker_exited = tokio::spawn(piece_cache_worker.run(piece_getter));
let farmer_cache_worker_exited = tokio::spawn(farmer_cache_worker.run(piece_getter));

// Reopen with the same backing caches
let initialized_fut = piece_cache
let initialized_fut = farmer_cache
.replace_backing_caches(vec![
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
])
.await;
drop(piece_cache);
drop(farmer_cache);

// Wait for piece cache to be initialized
initialized_fut.await.unwrap();
Expand All @@ -398,6 +399,6 @@ async fn basic() {
// Make worker exit
archived_segment_headers_sender.close().await.unwrap();

piece_cache_worker_exited.await.unwrap();
farmer_cache_worker_exited.await.unwrap();
}
}
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
//! are `target ± ½ * solution range` (while also handing overflow/underflow) when interpreted as
//! 64-bit unsigned integers.
pub mod farmer_cache;
pub(crate) mod identity;
pub mod node_client;
pub mod piece_cache;
pub mod reward_signing;
pub mod single_disk_farm;
pub mod thread_pool_manager;
Expand Down
15 changes: 3 additions & 12 deletions crates/subspace-farmer/src/single_disk_farm/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ struct Inner {
num_elements: usize,
}

/// Piece cache stored on one disk
/// Dedicated piece cache stored on one disk, is used both to accelerate DSN queries and to plot
/// faster
#[derive(Debug, Clone)]
pub struct DiskPieceCache {
inner: Arc<Inner>,
Expand All @@ -57,17 +58,7 @@ pub struct DiskPieceCache {
impl DiskPieceCache {
pub(super) const FILE_NAME: &'static str = "piece_cache.bin";

#[cfg(not(test))]
pub(super) fn open(directory: &Path, capacity: usize) -> Result<Self, DiskPieceCacheError> {
Self::open_internal(directory, capacity)
}

#[cfg(test)]
pub(crate) fn open(directory: &Path, capacity: usize) -> Result<Self, DiskPieceCacheError> {
Self::open_internal(directory, capacity)
}

pub(super) fn open_internal(
pub(in super::super) fn open(
directory: &Path,
capacity: usize,
) -> Result<Self, DiskPieceCacheError> {
Expand Down
Loading

0 comments on commit c9b4300

Please sign in to comment.