Skip to content

Commit

Permalink
Merge pull request #2050 from subspace/gemini-3f-backport-independent…
Browse files Browse the repository at this point in the history
…-farming-thread-pools

Gemini 3f backport: Use dedicated thread pool per farm to avoid making audits of different farms sequential
  • Loading branch information
nazar-pc authored Oct 4, 2023
2 parents 28d4693 + 2df4b8f commit 914ca84
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,6 @@ where
None => farmer_app_info.protocol_info.max_pieces_in_sector,
};

let farming_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("farming#{thread_index}"))
.num_threads(farming_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()?,
);
let plotting_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("plotting#{thread_index}"))
Expand Down Expand Up @@ -217,7 +210,7 @@ where
erasure_coding: erasure_coding.clone(),
piece_getter: piece_getter.clone(),
cache_percentage,
farming_thread_pool: Arc::clone(&farming_thread_pool),
farming_thread_pool_size,
plotting_thread_pool: Arc::clone(&plotting_thread_pool),
replotting_thread_pool: Arc::clone(&replotting_thread_pool),
},
Expand Down
13 changes: 8 additions & 5 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,18 @@ struct FarmingArgs {
/// Do not print info about configured farms on startup
#[arg(long)]
no_info: bool,
/// Size of thread pool used for farming (mostly for blocking I/O, but also for some compute-intensive
/// operations during proving), defaults to number of CPU cores available in the system
/// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some
/// compute-intensive operations during proving), defaults to number of CPU cores available in
/// the system
#[arg(long, default_value_t = available_parallelism())]
farming_thread_pool_size: usize,
/// Size of thread pool used for plotting, defaults to number of CPU cores available in the system
/// Size of thread pool used for plotting, defaults to number of CPU cores available in the
/// system. This thread pool is global for all farms and generally doesn't need to be changed.
#[arg(long, default_value_t = available_parallelism())]
plotting_thread_pool_size: usize,
/// Size of thread pool used for replotting, typically smaller pool than for plotting to not affect farming as much,
/// defaults to half of the number of CPU cores available in the system.
/// Size of thread pool used for replotting, typically smaller pool than for plotting to not
/// affect farming as much, defaults to half of the number of CPU cores available in the system.
/// This thread pool is global for all farms and generally doesn't need to be changed.
#[arg(long, default_value_t = available_parallelism() / 2)]
replotting_thread_pool_size: usize,
}
Expand Down
11 changes: 6 additions & 5 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ pub struct SingleDiskFarmOptions<NC, PG> {
pub erasure_coding: ErasureCoding,
/// Percentage of allocated space dedicated for caching purposes
pub cache_percentage: NonZeroU8,
/// Thread pool used for farming (mostly for blocking I/O, but also for some compute-intensive
/// operations during proving)
pub farming_thread_pool: Arc<ThreadPool>,
/// Thread pool size used for farming (mostly for blocking I/O, but also for some
/// compute-intensive operations during proving)
pub farming_thread_pool_size: usize,
/// Thread pool used for plotting
pub plotting_thread_pool: Arc<ThreadPool>,
/// Thread pool used for replotting, typically smaller pool than for plotting to not affect
Expand Down Expand Up @@ -595,7 +595,7 @@ impl SingleDiskFarm {
kzg,
erasure_coding,
cache_percentage,
farming_thread_pool,
farming_thread_pool_size,
plotting_thread_pool,
replotting_thread_pool,
} = options;
Expand Down Expand Up @@ -991,6 +991,7 @@ impl SingleDiskFarm {
}

let farming_options = FarmingOptions {
disk_farm_index,
public_key,
reward_address,
node_client,
Expand All @@ -1002,7 +1003,7 @@ impl SingleDiskFarm {
handlers,
modifying_sector_index,
slot_info_notifications: slot_info_forwarder_receiver,
thread_pool: farming_thread_pool,
thread_pool_size: farming_thread_pool_size,
};
farming::<PosTable, _>(farming_options).await
};
Expand Down
15 changes: 10 additions & 5 deletions crates/subspace-farmer/src/single_disk_farm/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::channel::mpsc;
use futures::StreamExt;
use parking_lot::{Mutex, RwLock};
use rayon::prelude::*;
use rayon::{ThreadPool, ThreadPoolBuildError};
use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
use std::fs::File;
use std::io;
use std::sync::Arc;
Expand Down Expand Up @@ -66,6 +66,7 @@ pub enum FarmingError {
}

pub(super) struct FarmingOptions<'a, NC> {
pub(super) disk_farm_index: usize,
pub(super) public_key: PublicKey,
pub(super) reward_address: PublicKey,
pub(super) node_client: NC,
Expand All @@ -77,15 +78,13 @@ pub(super) struct FarmingOptions<'a, NC> {
pub(super) handlers: Arc<Handlers>,
pub(super) modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
pub(super) thread_pool: Arc<ThreadPool>,
pub(super) thread_pool_size: usize,
}

/// Starts farming process.
///
/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
/// thread.
// False-positive, we do drop lock before .await
#[allow(clippy::await_holding_lock)]
pub(super) async fn farming<PosTable, NC>(
farming_options: FarmingOptions<'_, NC>,
) -> Result<(), FarmingError>
Expand All @@ -94,6 +93,7 @@ where
NC: NodeClient,
{
let FarmingOptions {
disk_farm_index,
public_key,
reward_address,
node_client,
Expand All @@ -105,9 +105,14 @@ where
handlers,
modifying_sector_index,
mut slot_info_notifications,
thread_pool,
thread_pool_size,
} = farming_options;

let thread_pool = ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("farming-{disk_farm_index}.{thread_index}"))
.num_threads(thread_pool_size)
.build()?;

let table_generator = Arc::new(Mutex::new(PosTable::generator()));

while let Some(slot_info) = slot_info_notifications.next().await {
Expand Down

0 comments on commit 914ca84

Please sign in to comment.