Skip to content

Commit

Permalink
Introduce WeakFarmerPieceGetter to break reference cycles between n…
Browse files Browse the repository at this point in the history
…etworking using piece cache and piece cache worker using networking through piece getter
  • Loading branch information
nazar-pc committed Feb 17, 2024
1 parent 252af75 commit 558654f
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,16 +429,16 @@ where
));
let piece_provider = PieceProvider::new(node.clone(), validator.clone());

let piece_getter = Arc::new(FarmerPieceGetter::new(
let piece_getter = FarmerPieceGetter::new(
piece_provider,
farmer_cache.clone(),
node_client.clone(),
Arc::clone(&plotted_pieces),
));
);

let farmer_cache_worker_fut = run_future_in_dedicated_thread(
{
let future = farmer_cache_worker.run(piece_getter.clone());
let future = farmer_cache_worker.run(piece_getter.downgrade());

move || future
},
Expand Down
4 changes: 3 additions & 1 deletion crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ impl<NC> FarmerCacheWorker<NC>
where
NC: NodeClient,
{
/// Run the cache worker with provided piece getter
/// Run the cache worker with provided piece getter.
///
/// NOTE: Piece getter must not depend on farmer cache in order to avoid reference cycles!
pub async fn run<PG>(mut self, piece_getter: PG)
where
PG: PieceGetter,
Expand Down
89 changes: 78 additions & 11 deletions crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::NodeClient;
use async_trait::async_trait;
use parking_lot::Mutex;
use std::error::Error;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::RecordKey;
Expand All @@ -14,13 +14,25 @@ use tracing::{debug, error, trace};

const MAX_RANDOM_WALK_ROUNDS: usize = 15;

pub struct FarmerPieceGetter<PV, NC> {
struct Inner<PV, NC> {
piece_provider: PieceProvider<PV>,
farmer_cache: FarmerCache,
node_client: NC,
plotted_pieces: Arc<Mutex<Option<PlottedPieces>>>,
}

pub struct FarmerPieceGetter<PV, NC> {
inner: Arc<Inner<PV, NC>>,
}

impl<PV, NC> Clone for FarmerPieceGetter<PV, NC> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}

impl<PV, NC> FarmerPieceGetter<PV, NC> {
pub fn new(
piece_provider: PieceProvider<PV>,
Expand All @@ -29,10 +41,20 @@ impl<PV, NC> FarmerPieceGetter<PV, NC> {
plotted_pieces: Arc<Mutex<Option<PlottedPieces>>>,
) -> Self {
Self {
piece_provider,
farmer_cache,
node_client,
plotted_pieces,
inner: Arc::new(Inner {
piece_provider,
farmer_cache,
node_client,
plotted_pieces,
}),
}
}

/// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally
/// used [`Arc`]
pub fn downgrade(&self) -> WeakFarmerPieceGetter<PV, NC> {
WeakFarmerPieceGetter {
inner: Arc::downgrade(&self.inner),
}
}

Expand All @@ -57,15 +79,17 @@ where
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
let key = RecordKey::from(piece_index.to_multihash());

let inner = &self.inner;

trace!(%piece_index, "Getting piece from farmer cache");
if let Some(piece) = self.farmer_cache.get_piece(key).await {
if let Some(piece) = inner.farmer_cache.get_piece(key).await {
trace!(%piece_index, "Got piece from farmer cache successfully");
return Ok(Some(piece));
}

// L2 piece acquisition
trace!(%piece_index, "Getting piece from DSN L2 cache");
let maybe_piece = self
let maybe_piece = inner
.piece_provider
.get_piece_from_dsn_cache(piece_index, Self::convert_retry_policy(retry_policy))
.await?;
Expand All @@ -77,7 +101,7 @@ where

// Try node's RPC before reaching to L1 (archival storage on DSN)
trace!(%piece_index, "Getting piece from node");
match self.node_client.piece(piece_index).await {
match inner.node_client.piece(piece_index).await {
Ok(Some(piece)) => {
trace!(%piece_index, "Got piece from node successfully");
return Ok(Some(piece));
Expand All @@ -95,7 +119,7 @@ where
}

trace!(%piece_index, "Getting piece from local plot");
let maybe_read_piece_fut = self
let maybe_read_piece_fut = inner
.plotted_pieces
.lock()
.as_ref()
Expand All @@ -111,7 +135,7 @@ where
// L1 piece acquisition
trace!(%piece_index, "Getting piece from DSN L1.");

let archival_storage_search_result = self
let archival_storage_search_result = inner
.piece_provider
.get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
.await;
Expand All @@ -129,3 +153,46 @@ where
Ok(None)
}
}

/// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`]
#[derive(Debug)]
pub struct WeakFarmerPieceGetter<PV, NC> {
inner: Weak<Inner<PV, NC>>,
}

impl<PV, NC> Clone for WeakFarmerPieceGetter<PV, NC> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}

#[async_trait]
impl<PV, NC> PieceGetter for WeakFarmerPieceGetter<PV, NC>
where
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
let Some(piece_getter) = self.upgrade() else {
debug!("Farmer piece getter upgrade didn't succeed");
return Ok(None);
};

piece_getter.get_piece(piece_index, retry_policy).await
}
}

impl<PV, NC> WeakFarmerPieceGetter<PV, NC> {
/// Try to upgrade to [`FarmerPieceGetter`] if there is at least one other instance of it alive
pub fn upgrade(&self) -> Option<FarmerPieceGetter<PV, NC>> {
Some(FarmerPieceGetter {
inner: self.inner.upgrade()?,
})
}
}

0 comments on commit 558654f

Please sign in to comment.