Skip to content

Commit

Permalink
Merge pull request #2432 from subspace/farmer-detailed-sector-updates
Browse files Browse the repository at this point in the history
Farmer detailed sector updates
  • Loading branch information
nazar-pc authored Jan 22, 2024
2 parents 031cbf5 + 1a45992 commit ca78fee
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 61 deletions.
4 changes: 2 additions & 2 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use backoff::future::retry;
use backoff::{Error as BackoffError, ExponentialBackoff};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use parity_scale_codec::Encode;
use parity_scale_codec::{Decode, Encode};
use rayon::prelude::*;
use std::error::Error;
use std::mem;
Expand Down Expand Up @@ -97,7 +97,7 @@ impl PieceGetter for ArchivedHistorySegment {
}

/// Information about sector that was plotted
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Encode, Decode)]
pub struct PlottedSector {
/// Sector ID
pub sector_id: SectorId,
Expand Down
21 changes: 14 additions & 7 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use subspace_core_primitives::{PublicKey, Record, SectorIndex};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::piece_cache::PieceCache;
use subspace_farmer::single_disk_farm::{
SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions,
SectorPlottingDetails, SectorUpdate, SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions,
};
use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter;
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
Expand Down Expand Up @@ -633,10 +633,8 @@ where

// Collect newly plotted pieces
let on_plotted_sector_callback =
move |(plotted_sector, maybe_old_plotted_sector): &(
PlottedSector,
Option<PlottedSector>,
)| {
move |plotted_sector: &PlottedSector,
maybe_old_plotted_sector: &Option<PlottedSector>| {
let _span_guard = span.enter();

{
Expand All @@ -645,7 +643,7 @@ where
.as_mut()
.expect("Initial value was populated above; qed");

if let Some(old_plotted_sector) = maybe_old_plotted_sector {
if let Some(old_plotted_sector) = &maybe_old_plotted_sector {
readers_and_pieces.delete_sector(disk_farm_index, old_plotted_sector);
}
readers_and_pieces.add_sector(disk_farm_index, plotted_sector);
Expand All @@ -659,7 +657,16 @@ where
};

single_disk_farm
.on_sector_plotted(Arc::new(on_plotted_sector_callback))
.on_sector_update(Arc::new(move |(_sector_index, sector_state)| {
if let SectorUpdate::Plotting(SectorPlottingDetails::Finished {
plotted_sector,
old_plotted_sector,
..
}) = sector_state
{
on_plotted_sector_callback(plotted_sector, old_plotted_sector);
}
}))
.detach();

single_disk_farm
Expand Down
82 changes: 60 additions & 22 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,21 +539,66 @@ type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
type Handler<A> = Bag<HandlerFn<A>, A>;

/// Details about sector currently being plotted
pub struct SectorPlottingDetails {
/// Sector index
pub sector_index: SectorIndex,
/// Progress so far in % (not including this sector)
pub progress: f32,
/// Whether sector is being replotted
pub replotting: bool,
/// Whether this is the last sector queued so far
pub last_queued: bool,
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorPlottingDetails {
/// Starting plotting of a sector
Starting {
/// Progress so far in % (not including this sector)
progress: f32,
/// Whether sector is being replotted
replotting: bool,
/// Whether this is the last sector queued so far
last_queued: bool,
},
/// Downloading sector pieces
Downloading,
/// Downloaded sector pieces
Downloaded(Duration),
/// Encoding sector pieces
Encoding,
/// Encoded sector pieces
Encoded(Duration),
/// Writing sector
Writing,
/// Wrote sector
Wrote(Duration),
/// Finished plotting
Finished {
/// Information about plotted sector
plotted_sector: PlottedSector,
/// Information about old plotted sector that was replaced
old_plotted_sector: Option<PlottedSector>,
/// How much time it took to plot a sector
time: Duration,
},
}

/// Details about sector expiration
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorExpirationDetails {
/// Sector expiration became known
Determined {
/// Segment index at which sector expires
expires_at: SegmentIndex,
},
/// Sector will expire at the next segment index and should be replotted
AboutToExpire,
/// Sector already expired
Expired,
}

/// Various sector updates
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorUpdate {
/// Sector is is being plotted
Plotting(SectorPlottingDetails),
/// Sector expiration information updated
Expiration(SectorExpirationDetails),
}

#[derive(Default, Debug)]
struct Handlers {
sector_plotting: Handler<SectorPlottingDetails>,
sector_plotted: Handler<(PlottedSector, Option<PlottedSector>)>,
sector_update: Handler<(SectorIndex, SectorUpdate)>,
solution: Handler<SolutionResponse>,
plot_audited: Handler<AuditEvent>,
}
Expand Down Expand Up @@ -984,6 +1029,7 @@ impl SingleDiskFarm {
last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
node_client: node_client.clone(),
handlers: Arc::clone(&handlers),
sectors_metadata: Arc::clone(&sectors_metadata),
sectors_to_plot_sender,
initial_plotting_finished: farming_delay_sender,
Expand Down Expand Up @@ -1316,17 +1362,9 @@ impl SingleDiskFarm {
self.piece_reader.clone()
}

/// Subscribe to sector plotting notification
pub fn on_sector_plotting(&self, callback: HandlerFn<SectorPlottingDetails>) -> HandlerId {
self.handlers.sector_plotting.add(callback)
}

/// Subscribe to notification about plotted sectors
pub fn on_sector_plotted(
&self,
callback: HandlerFn<(PlottedSector, Option<PlottedSector>)>,
) -> HandlerId {
self.handlers.sector_plotted.add(callback)
/// Subscribe to sector updates
pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
self.handlers.sector_update.add(callback)
}

/// Subscribe to notification about audited plots
Expand Down
Loading

0 comments on commit ca78fee

Please sign in to comment.