From 1f1c8dedb48412f684b258d961e881867cce4b6a Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 31 Oct 2024 17:37:44 +0200 Subject: [PATCH] Fix plotting getting stuck --- .../src/single_disk_farm/plotting.rs | 67 +++++++++++++------ 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 1c5d105cf5..f327c3f607 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -6,7 +6,7 @@ use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics}; use crate::single_disk_farm::{ BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA, }; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore}; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore, SemaphoreGuard}; use futures::channel::{mpsc, oneshot}; use futures::stream::FuturesOrdered; use futures::{select, FutureExt, SinkExt, StreamExt}; @@ -123,8 +123,30 @@ where max_plotting_sectors_per_farm, } = plotting_options; + let sector_plotting_options = §or_plotting_options; let plotting_semaphore = Semaphore::new(max_plotting_sectors_per_farm.get()); let mut sectors_being_plotted = FuturesOrdered::new(); + // Channel size is intentionally unbounded for easier analysis, but it is bounded by plotting + // semaphore in practice due to permit stored in `SectorPlottingResult` + let (sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc::unbounded(); + let process_plotting_result_fut = async move { + while let Some(sector_plotting_result) = sector_plotting_result_receiver.next().await { + process_plotting_result( + sector_plotting_result, + sectors_metadata, + sectors_being_modified, + &mut metadata_header, + Arc::clone(§or_plotting_options.metadata_file), + ) + .await?; + } + + unreachable!( + "Stream will not end before the rest of the plotting process is shutting down" + ); + }; + let process_plotting_result_fut = process_plotting_result_fut.fuse(); + let mut process_plotting_result_fut = pin!(process_plotting_result_fut); // Wait for new sectors to plot from `sectors_to_plot_receiver` and wait for sectors that // already started plotting to finish plotting and then update metadata header @@ -138,7 +160,7 @@ where let sector_index = sector_to_plot.sector_index; let sector_plotting_init_fut = plot_single_sector( sector_to_plot, - §or_plotting_options, + sector_plotting_options, sectors_metadata, sectors_being_modified, &plotting_semaphore, @@ -168,25 +190,23 @@ where break; } maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => { - process_plotting_result( - maybe_sector_plotting_result?, - sectors_metadata, - sectors_being_modified, - &mut metadata_header, - Arc::clone(§or_plotting_options.metadata_file) - ).await?; + sector_plotting_result_sender + .unbounded_send(maybe_sector_plotting_result?) + .expect("Sending means receiver is not dropped yet; qed"); + } + result = process_plotting_result_fut => { + return result; } } } } maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => { - process_plotting_result( - maybe_sector_plotting_result?, - sectors_metadata, - sectors_being_modified, - &mut metadata_header, - Arc::clone(§or_plotting_options.metadata_file) - ).await?; + sector_plotting_result_sender + .unbounded_send(maybe_sector_plotting_result?) + .expect("Sending means receiver is not dropped yet; qed"); + } + result = process_plotting_result_fut => { + return result; } } } @@ -195,7 +215,7 @@ where } async fn process_plotting_result( - sector_plotting_result: SectorPlottingResult, + sector_plotting_result: SectorPlottingResult<'_>, sectors_metadata: &AsyncRwLock>, sectors_being_modified: &AsyncRwLock>, metadata_header: &mut PlotMetadataHeader, @@ -205,6 +225,7 @@ async fn process_plotting_result( sector_metadata, replotting, last_queued, + plotting_permit, } = sector_plotting_result; let sector_index = sector_metadata.sector_index; @@ -241,6 +262,8 @@ async fn process_plotting_result( } } + drop(plotting_permit); + Ok(()) } @@ -250,10 +273,11 @@ enum PlotSingleSectorResult { FatalError(PlottingError), } -struct SectorPlottingResult { +struct SectorPlottingResult<'a> { sector_metadata: SectorMetadataChecksummed, replotting: bool, last_queued: bool, + plotting_permit: SemaphoreGuard<'a>, } async fn plot_single_sector<'a, NC>( @@ -262,7 +286,9 @@ async fn plot_single_sector<'a, NC>( sectors_metadata: &'a AsyncRwLock>, sectors_being_modified: &'a AsyncRwLock>, plotting_semaphore: &'a Semaphore, -) -> PlotSingleSectorResult> + 'a> +) -> PlotSingleSectorResult< + impl Future, PlottingError>> + 'a, +> where NC: NodeClient, { @@ -478,12 +504,11 @@ where .sector_update .call_simple(&(sector_index, sector_state)); - drop(plotting_permit); - Ok(SectorPlottingResult { sector_metadata, replotting, last_queued, + plotting_permit, }) }) }