From ccda4a5c5306646e52191355e2d6202f18f9b2ad Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 26 Dec 2023 01:09:27 +0200 Subject: [PATCH 1/8] Introduce `ThreadPoolManager` concept instead of separate thread pools for different farms + semaphore --- .../src/plotting.rs | 15 +-- .../src/bin/subspace-farmer/commands/farm.rs | 38 +++++++- crates/subspace-farmer/src/lib.rs | 1 + .../subspace-farmer/src/single_disk_farm.rs | 68 ++----------- .../src/single_disk_farm/plotting.rs | 30 +++--- .../src/thread_pool_manager.rs | 97 +++++++++++++++++++ 6 files changed, 160 insertions(+), 89 deletions(-) create mode 100644 crates/subspace-farmer/src/thread_pool_manager.rs diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index e5a646f2c6..d68db7d8e3 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -241,6 +241,11 @@ where downloading_semaphore, }); + let _encoding_permit = match encoding_semaphore { + Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await?), + None => None, + }; + encode_sector( download_sector_fut.await?, EncodeSectorOptions:: { @@ -249,7 +254,6 @@ where pieces_in_sector, sector_output, sector_metadata_output, - encoding_semaphore, table_generator, }, ) @@ -399,9 +403,6 @@ where /// Where plotted sector metadata should be written, vector must either be empty (in which case /// it'll be resized to correct size automatically) or correctly sized from the beginning pub sector_metadata_output: &'a mut Vec, - /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically - /// allow one permit at a time for efficient CPU utilization - pub encoding_semaphore: Option<&'a Semaphore>, /// Proof of space table generator pub table_generator: &'a mut PosTable::Generator, } @@ -426,7 +427,6 @@ where pieces_in_sector, sector_output, sector_metadata_output, - encoding_semaphore, table_generator, } = encoding_options; @@ -452,11 +452,6 @@ where }); } - let _encoding_permit = match encoding_semaphore { - Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await?), - None => None, - }; - let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index a7a9e5753d..6b85b02f99 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -10,6 +10,7 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use lru::LruCache; use parking_lot::Mutex; +use rayon::ThreadPoolBuilder; use std::fs; use std::net::SocketAddr; use std::num::{NonZeroU8, NonZeroUsize}; @@ -24,11 +25,14 @@ use subspace_farmer::piece_cache::PieceCache; use subspace_farmer::single_disk_farm::{ SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, }; +use subspace_farmer::thread_pool_manager::ThreadPoolManager; use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter; use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::utils::ss58::parse_ss58_reward_address; -use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop}; +use subspace_farmer::utils::{ + run_future_in_dedicated_thread, tokio_rayon_spawn_handler, AsyncJoinOnDrop, +}; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; @@ -438,7 +442,32 @@ where }; let downloading_semaphore = Arc::new(Semaphore::new(sector_downloading_concurrency.get())); - let encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get())); + + let plotting_thread_pool_manager = ThreadPoolManager::new( + |thread_pool_index| { + ThreadPoolBuilder::new() + .thread_name(move |thread_index| { + format!("plotting-{thread_pool_index}.{thread_index}") + }) + .num_threads(plotting_thread_pool_size) + .spawn_handler(tokio_rayon_spawn_handler()) + .build() + }, + sector_encoding_concurrency, + )?; + + let replotting_thread_pool_manager = ThreadPoolManager::new( + |thread_pool_index| { + ThreadPoolBuilder::new() + .thread_name(move |thread_index| { + format!("replotting-{thread_pool_index}.{thread_index}") + }) + .num_threads(replotting_thread_pool_size) + .spawn_handler(tokio_rayon_spawn_handler()) + .build() + }, + sector_encoding_concurrency, + )?; let mut plotting_delay_senders = Vec::with_capacity(disk_farms.len()); @@ -461,11 +490,10 @@ where piece_getter: piece_getter.clone(), cache_percentage, downloading_semaphore: Arc::clone(&downloading_semaphore), - encoding_semaphore: Arc::clone(&encoding_semaphore), farm_during_initial_plotting, farming_thread_pool_size, - plotting_thread_pool_size, - replotting_thread_pool_size, + plotting_thread_pool_manager: plotting_thread_pool_manager.clone(), + replotting_thread_pool_manager: replotting_thread_pool_manager.clone(), plotting_delay: Some(plotting_delay_receiver), }, disk_farm_index, diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index 691ad98c60..126cff7afa 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -40,6 +40,7 @@ pub mod node_client; pub mod piece_cache; pub mod reward_signing; pub mod single_disk_farm; +pub mod thread_pool_manager; pub mod utils; /// Size of the LRU cache for peers. diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 79a5e82bc1..6e73b95279 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -17,6 +17,7 @@ pub use crate::single_disk_farm::plotting::PlottingError; use crate::single_disk_farm::plotting::{ plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, }; +use crate::thread_pool_manager::ThreadPoolManager; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; use crate::KNOWN_PEERS_CACHE_SIZE; use async_lock::RwLock; @@ -281,19 +282,16 @@ pub struct SingleDiskFarmOptions { /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory /// usage of the plotting process, permit will be held until the end of the plotting process pub downloading_semaphore: Arc, - /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically - /// allow one permit at a time for efficient CPU utilization - pub encoding_semaphore: Arc, /// Whether to farm during initial plotting pub farm_during_initial_plotting: bool, /// Thread pool size used for farming (mostly for blocking I/O, but also for some /// compute-intensive operations during proving) pub farming_thread_pool_size: usize, - /// Thread pool size used for plotting - pub plotting_thread_pool_size: usize, - /// Thread pool size used for replotting, typically smaller pool than for plotting to not affect - /// farming as much - pub replotting_thread_pool_size: usize, + /// Thread pool manager used for plotting + pub plotting_thread_pool_manager: ThreadPoolManager, + /// Thread pool manager used for replotting, typically smaller pool than for plotting to not + /// affect farming as much + pub replotting_thread_pool_manager: ThreadPoolManager, /// Notification for plotter to start, can be used to delay plotting until some initialization /// has happened externally pub plotting_delay: Option>, @@ -625,10 +623,9 @@ impl SingleDiskFarm { erasure_coding, cache_percentage, downloading_semaphore, - encoding_semaphore, farming_thread_pool_size, - plotting_thread_pool_size, - replotting_thread_pool_size, + plotting_thread_pool_manager, + replotting_thread_pool_manager, plotting_delay, farm_during_initial_plotting, } = options; @@ -913,50 +910,6 @@ impl SingleDiskFarm { move || { let _span_guard = span.enter(); - let plotting_thread_pool = match ThreadPoolBuilder::new() - .thread_name(move |thread_index| { - format!("plotting-{disk_farm_index}.{thread_index}") - }) - .num_threads(plotting_thread_pool_size) - .spawn_handler(tokio_rayon_spawn_handler()) - .build() - .map_err(PlottingError::FailedToCreateThreadPool) - { - Ok(thread_pool) => thread_pool, - Err(error) => { - if let Some(error_sender) = error_sender.lock().take() { - if let Err(error) = error_sender.send(error.into()) { - error!( - %error, - "Plotting failed to send error to background task", - ); - } - } - return; - } - }; - let replotting_thread_pool = match ThreadPoolBuilder::new() - .thread_name(move |thread_index| { - format!("replotting-{disk_farm_index}.{thread_index}") - }) - .num_threads(replotting_thread_pool_size) - .spawn_handler(tokio_rayon_spawn_handler()) - .build() - .map_err(PlottingError::FailedToCreateThreadPool) - { - Ok(thread_pool) => thread_pool, - Err(error) => { - if let Some(error_sender) = error_sender.lock().take() { - if let Err(error) = error_sender.send(error.into()) { - error!( - %error, - "Plotting failed to send error to background task", - ); - } - } - return; - } - }; let plotting_options = PlottingOptions { public_key, @@ -975,9 +928,8 @@ impl SingleDiskFarm { modifying_sector_index, sectors_to_plot_receiver, downloading_semaphore, - encoding_semaphore: &encoding_semaphore, - plotting_thread_pool, - replotting_thread_pool, + plotting_thread_pool_manager, + replotting_thread_pool_manager, stop_receiver: &mut stop_receiver.resubscribe(), }; diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 85602d4f7d..43e861c1fd 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -2,6 +2,7 @@ use crate::single_disk_farm::{ BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails, RESERVED_PLOT_METADATA, }; +use crate::thread_pool_manager::ThreadPoolManager; use crate::utils::AsyncJoinOnDrop; use crate::{node_client, NodeClient}; use async_lock::RwLock; @@ -10,7 +11,6 @@ use futures::channel::{mpsc, oneshot}; use futures::{select, FutureExt, SinkExt, StreamExt}; use lru::LruCache; use parity_scale_codec::Encode; -use rayon::{ThreadPool, ThreadPoolBuildError}; use std::collections::HashMap; use std::fs::File; use std::io; @@ -37,6 +37,7 @@ use subspace_proof_of_space::Table; use thiserror::Error; use tokio::runtime::Handle; use tokio::sync::{broadcast, Semaphore}; +use tokio::task::yield_now; use tracing::{debug, info, trace, warn, Instrument}; const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500); @@ -91,9 +92,6 @@ pub enum PlottingError { /// I/O error occurred #[error("I/O error: {0}")] Io(#[from] io::Error), - /// Failed to create thread pool - #[error("Failed to create thread pool: {0}")] - FailedToCreateThreadPool(#[from] ThreadPoolBuildError), /// Background downloading panicked #[error("Background downloading panicked")] BackgroundDownloadingPanicked, @@ -118,11 +116,8 @@ pub(super) struct PlottingOptions<'a, NC, PG> { /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory /// usage of the plotting process, permit will be held until the end of the plotting process pub(crate) downloading_semaphore: Arc, - /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically - /// allow one permit at a time for efficient CPU utilization - pub(crate) encoding_semaphore: &'a Semaphore, - pub(super) plotting_thread_pool: ThreadPool, - pub(super) replotting_thread_pool: ThreadPool, + pub(super) plotting_thread_pool_manager: ThreadPoolManager, + pub(super) replotting_thread_pool_manager: ThreadPoolManager, pub(super) stop_receiver: &'a mut broadcast::Receiver<()>, } @@ -155,9 +150,8 @@ where modifying_sector_index, mut sectors_to_plot_receiver, downloading_semaphore, - encoding_semaphore, - plotting_thread_pool, - replotting_thread_pool, + plotting_thread_pool_manager, + replotting_thread_pool_manager, stop_receiver, } = plotting_options; @@ -299,7 +293,6 @@ where pieces_in_sector, sector_output: &mut sector, sector_metadata_output: &mut sector_metadata, - encoding_semaphore: Some(encoding_semaphore), table_generator: &mut table_generator, }, )); @@ -320,12 +313,17 @@ where }) }; - let plotting_result = if replotting { - replotting_thread_pool.install(plotting_fn) + let thread_pool = if replotting { + replotting_thread_pool_manager.get_thread_pool() } else { - plotting_thread_pool.install(plotting_fn) + plotting_thread_pool_manager.get_thread_pool() }; + // Give a chance to interrupt plotting if necessary + yield_now().await; + + let plotting_result = thread_pool.install(plotting_fn); + if matches!(plotting_result, Err(PlottingError::FarmIsShuttingDown)) { return Ok(()); } diff --git a/crates/subspace-farmer/src/thread_pool_manager.rs b/crates/subspace-farmer/src/thread_pool_manager.rs new file mode 100644 index 0000000000..2de7decef4 --- /dev/null +++ b/crates/subspace-farmer/src/thread_pool_manager.rs @@ -0,0 +1,97 @@ +use parking_lot::{Condvar, Mutex}; +use rayon::{ThreadPool, ThreadPoolBuildError}; +use std::num::NonZeroUsize; +use std::ops::Deref; +use std::sync::Arc; + +#[derive(Debug)] +struct Inner { + thread_pools: Vec, +} + +/// Wrapper around [`ThreadPool`] that on `Drop` will return thread pool back into corresponding +/// [`ThreadPoolManager`]. +#[derive(Debug)] +pub struct ThreadPoolGuard { + inner: Arc<(Mutex, Condvar)>, + thread_pool: Option, +} + +impl Deref for ThreadPoolGuard { + type Target = ThreadPool; + + fn deref(&self) -> &Self::Target { + self.thread_pool + .as_ref() + .expect("Value exists until `Drop`; qed") + } +} + +impl Drop for ThreadPoolGuard { + fn drop(&mut self) { + let (mutex, cvar) = &*self.inner; + let mut inner = mutex.lock(); + inner.thread_pools.push( + self.thread_pool + .take() + .expect("Happens only once in `Drop`; qed"), + ); + cvar.notify_one(); + } +} + +/// Thread pool manager. +/// +/// This abstraction wraps a set of thread pools and allows to use them one at a time. +/// +/// For example on machine with 64 logical cores and 4 NUMA nodes it would be recommended to create +/// 4 thread pools with 16 threads each, which would mean work done within thread pool is tied to +/// that thread pool. +#[derive(Debug, Clone)] +pub struct ThreadPoolManager { + inner: Arc<(Mutex, Condvar)>, +} + +impl ThreadPoolManager { + /// Create new thread pool manager by instantiating `thread_pools` thread pools using + /// `create_thread_pool`. + /// + /// `create_thread_pool` takes one argument `thread_pool_index`. + pub fn new( + create_thread_pool: C, + thread_pools: NonZeroUsize, + ) -> Result + where + C: Fn(usize) -> Result, + { + let inner = Inner { + thread_pools: (0..thread_pools.get()) + .map(create_thread_pool) + .collect::, _>>()?, + }; + + Ok(Self { + inner: Arc::new((Mutex::new(inner), Condvar::new())), + }) + } + + /// Get one of inner thread pools, will block until one is available if needed + #[must_use] + pub fn get_thread_pool(&self) -> ThreadPoolGuard { + let (mutex, cvar) = &*self.inner; + let mut inner = mutex.lock(); + + let thread_pool = inner.thread_pools.pop().unwrap_or_else(|| { + cvar.wait(&mut inner); + + inner.thread_pools.pop().expect( + "Guaranteed by parking_lot's API to happen when thread pool is inserted; qed", + ) + }); + + ThreadPoolGuard { + inner: Arc::clone(&self.inner), + thread_pool: Some(thread_pool), + } + } +} From d3926ec9b4130e5f003410e6bc826258d582ff63 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 26 Dec 2023 01:17:32 +0200 Subject: [PATCH 2/8] Move `downloading_semaphore` one level up --- .../src/plotting.rs | 20 ++++--------- .../src/single_disk_farm/plotting.rs | 29 +++++++++++++------ 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index d68db7d8e3..c650aacb34 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -26,7 +26,7 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Table, TableGenerator}; use thiserror::Error; -use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{AcquireError, Semaphore}; use tokio::task::yield_now; use tracing::{debug, trace, warn}; @@ -230,6 +230,11 @@ where table_generator, } = options; + let _downloading_permit = match downloading_semaphore { + Some(downloading_semaphore) => Some(downloading_semaphore.acquire_owned().await?), + None => None, + }; + let download_sector_fut = download_sector(DownloadSectorOptions { public_key, sector_index, @@ -238,7 +243,6 @@ where farmer_protocol_info, kzg, pieces_in_sector, - downloading_semaphore, }); let _encoding_permit = match encoding_semaphore { @@ -266,7 +270,6 @@ pub struct DownloadedSector { piece_indices: Vec, raw_sector: RawSector, farmer_protocol_info: FarmerProtocolInfo, - downloading_permit: Option, } /// Options for sector downloading @@ -285,9 +288,6 @@ pub struct DownloadSectorOptions<'a, PG> { pub kzg: &'a Kzg, /// How many pieces should sector contain pub pieces_in_sector: u16, - /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory - /// usage of the plotting process, permit will be held until the end of the plotting process - pub downloading_semaphore: Option>, } /// Download sector for plotting. @@ -308,14 +308,8 @@ where farmer_protocol_info, kzg, pieces_in_sector, - downloading_semaphore, } = options; - let downloading_permit = match downloading_semaphore { - Some(downloading_semaphore) => Some(downloading_semaphore.acquire_owned().await?), - None => None, - }; - let sector_id = SectorId::new(public_key.hash(), sector_index); let piece_indices = (PieceOffset::ZERO..) @@ -378,7 +372,6 @@ where piece_indices, raw_sector: raw_sector.into_inner(), farmer_protocol_info, - downloading_permit, }) } @@ -419,7 +412,6 @@ where piece_indices, mut raw_sector, farmer_protocol_info, - downloading_permit: _downloading_permit, } = downloaded_sector; let EncodeSectorOptions { sector_index, diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 43e861c1fd..7aa4bea139 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -36,7 +36,7 @@ use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_proof_of_space::Table; use thiserror::Error; use tokio::runtime::Handle; -use tokio::sync::{broadcast, Semaphore}; +use tokio::sync::{broadcast, OwnedSemaphorePermit, Semaphore}; use tokio::task::yield_now; use tracing::{debug, info, trace, warn, Instrument}; @@ -157,8 +157,9 @@ where let mut table_generator = PosTable::generator(); - let mut maybe_next_downloaded_sector_fut = - None::>>; + let mut maybe_next_downloaded_sector_fut = None::< + AsyncJoinOnDrop>, + >; while let Some(sector_to_plot) = sectors_to_plot_receiver.next().await { let SectorToPlot { sector_index, @@ -221,12 +222,17 @@ where break farmer_app_info; }; - let downloaded_sector = + let (_downloading_permit, downloaded_sector) = if let Some(downloaded_sector_fut) = maybe_next_downloaded_sector_fut.take() { downloaded_sector_fut .await .map_err(|_error| PlottingError::BackgroundDownloadingPanicked)?? } else { + let downloading_permit = Arc::clone(&downloading_semaphore) + .acquire_owned() + .await + .map_err(plotting::PlottingError::from)?; + let downloaded_sector_fut = download_sector(DownloadSectorOptions { public_key: &public_key, sector_index, @@ -237,20 +243,25 @@ where farmer_protocol_info: farmer_app_info.protocol_info, kzg, pieces_in_sector, - downloading_semaphore: Some(Arc::clone(&downloading_semaphore)), }); - downloaded_sector_fut.await? + + (downloading_permit, downloaded_sector_fut.await?) }; // Initiate downloading of pieces for the next segment index if already known if let Some(sector_index) = next_segment_index_hint { let piece_getter = piece_getter.clone(); - let downloading_semaphore = Some(Arc::clone(&downloading_semaphore)); + let downloading_semaphore = Arc::clone(&downloading_semaphore); let kzg = kzg.clone(); maybe_next_downloaded_sector_fut.replace(AsyncJoinOnDrop::new( tokio::spawn( async move { + let downloading_permit = downloading_semaphore + .acquire_owned() + .await + .map_err(plotting::PlottingError::from)?; + let downloaded_sector_fut = download_sector(DownloadSectorOptions { public_key: &public_key, sector_index, @@ -261,9 +272,9 @@ where farmer_protocol_info: farmer_app_info.protocol_info, kzg: &kzg, pieces_in_sector, - downloading_semaphore, }); - downloaded_sector_fut.await + + Ok((downloading_permit, downloaded_sector_fut.await?)) } .in_current_span(), ), From 224a6879357e0f2e78a3cb2e01f7d5fd1120ba27 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 26 Dec 2023 01:40:56 +0200 Subject: [PATCH 3/8] Introduce `rayon_custom_spawn_handler` as more general and powerful version of `tokio_rayon_spawn_handler` --- crates/subspace-farmer/src/utils.rs | 37 ++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index c396c52c53..cbd206e8d2 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -137,10 +137,18 @@ where } /// This function is supposed to be used with [`rayon::ThreadPoolBuilder::spawn_handler()`] to -/// inherit current tokio runtime. -pub fn tokio_rayon_spawn_handler() -> impl FnMut(ThreadBuilder) -> io::Result<()> { - let handle = Handle::current(); - +/// spawn handler with a custom logic defined by `spawn_hook_builder`. +/// +/// `spawn_hook_builder` is called with thread builder to create `spawn_handler` that in turn will +/// be spawn rayon's thread with desired environment. +pub fn rayon_custom_spawn_handler( + mut spawn_handler_builder: SpawnHandlerBuilder, +) -> impl FnMut(ThreadBuilder) -> io::Result<()> +where + SpawnHandlerBuilder: (FnMut(ThreadBuilder) -> SpawnHandler) + Clone, + SpawnHandler: (FnOnce() -> SpawnHandlerResult) + Send + 'static, + SpawnHandlerResult: Send + 'static, +{ move |thread: ThreadBuilder| { let mut b = thread::Builder::new(); if let Some(name) = thread.name() { @@ -150,12 +158,23 @@ pub fn tokio_rayon_spawn_handler() -> impl FnMut(ThreadBuilder) -> io::Result<() b = b.stack_size(stack_size); } + b.spawn(spawn_handler_builder(thread))?; + Ok(()) + } +} + +/// This function is supposed to be used with [`rayon::ThreadPoolBuilder::spawn_handler()`] to +/// inherit current tokio runtime. +pub fn tokio_rayon_spawn_handler() -> impl FnMut(ThreadBuilder) -> io::Result<()> { + let handle = Handle::current(); + + rayon_custom_spawn_handler(move |thread| { let handle = handle.clone(); - b.spawn(move || { + + move || { let _guard = handle.enter(); - tokio::task::block_in_place(|| thread.run()) - })?; - Ok(()) - } + task::block_in_place(|| thread.run()) + } + }) } From 3aba9a9e6ec0c2504dcd35156ea2db5cfb1a574e Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 26 Dec 2023 04:08:07 +0200 Subject: [PATCH 4/8] Pin plotting/replotting threads to cores and prepare for NUMA support --- Cargo.lock | 2 + crates/subspace-farmer/Cargo.toml | 2 + .../src/bin/subspace-farmer/commands/farm.rs | 151 +++++++++--------- crates/subspace-farmer/src/utils.rs | 129 ++++++++++++++- 4 files changed, 207 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4421fc5bcc..5f4d994a95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11697,6 +11697,7 @@ dependencies = [ "blake3", "bytesize", "clap", + "core_affinity", "criterion", "derive_more", "event-listener-primitives", @@ -11707,6 +11708,7 @@ dependencies = [ "jsonrpsee", "lru 0.11.1", "mimalloc", + "num_cpus", "parity-scale-codec", "parking_lot 0.12.1", "prometheus-client 0.22.0", diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index d9e1131ef2..0e5f0a7b84 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -21,6 +21,7 @@ blake2 = "0.10.6" blake3 = { version = "1.4.1", default-features = false } bytesize = "1.3.0" clap = { version = "4.4.3", features = ["color", "derive"] } +core_affinity = "0.8.1" criterion = { version = "0.5.1", default-features = false, features = ["rayon", "async"] } derive_more = "0.99.17" event-listener-primitives = "2.0.1" @@ -31,6 +32,7 @@ hex = { version = "0.4.3", features = ["serde"] } jsonrpsee = { version = "0.16.3", features = ["client"] } lru = "0.11.0" mimalloc = "0.1.39" +num_cpus = "1.16.0" parity-scale-codec = "3.6.5" parking_lot = "0.12.1" prometheus-client = "0.22.0" diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 6b85b02f99..ca3536b73d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -10,7 +10,6 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use lru::LruCache; use parking_lot::Mutex; -use rayon::ThreadPoolBuilder; use std::fs; use std::net::SocketAddr; use std::num::{NonZeroU8, NonZeroUsize}; @@ -25,13 +24,13 @@ use subspace_farmer::piece_cache::PieceCache; use subspace_farmer::single_disk_farm::{ SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, }; -use subspace_farmer::thread_pool_manager::ThreadPoolManager; use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter; use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::utils::ss58::parse_ss58_reward_address; use subspace_farmer::utils::{ - run_future_in_dedicated_thread, tokio_rayon_spawn_handler, AsyncJoinOnDrop, + all_cpus, create_tokio_thread_pool_manager_for_pinned_cores, run_future_in_dedicated_thread, + thread_pool_core_indices, AsyncJoinOnDrop, }; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; @@ -47,23 +46,12 @@ use zeroize::Zeroizing; const RECORDS_ROOTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1_000_000).expect("Not zero; qed"); -fn available_parallelism() -> usize { - match std::thread::available_parallelism() { - Ok(parallelism) => parallelism.get(), - Err(error) => { - warn!( - %error, - "Unable to identify available parallelism, you might want to configure thread pool sizes with CLI \ - options manually" - ); - - 0 - } - } -} - fn should_farm_during_initial_plotting() -> bool { - available_parallelism() > 8 + let total_cpu_cores = all_cpus() + .into_iter() + .flat_map(|cores| cores.into_iter()) + .count(); + total_cpu_cores > 8 } /// Arguments for farmer @@ -120,42 +108,46 @@ pub(crate) struct FarmingArgs { #[arg(long, alias = "metrics-endpoint")] metrics_endpoints: Vec, /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of - /// the plotting process, increasing beyond 2 makes practical sense due to limited networking - /// concurrency and will likely result in slower plotting overall - #[arg(long, default_value = "2")] - sector_downloading_concurrency: NonZeroUsize, - /// Defines how many sectors farmer will encode concurrently, should generally never be set to - /// more than 1 because it will most likely result in slower plotting overall - #[arg(long, default_value = "1")] - sector_encoding_concurrency: NonZeroUsize, - /// Allows to enable farming during initial plotting. Not used by default because plotting is so - /// intense on CPU and memory that farming will likely not work properly, yet it will - /// significantly impact plotting speed, delaying the time when farming can actually work - /// properly. + /// the plotting process, defaults to `--sector-downloading-concurrency` + 1 to download future + /// sector ahead of time + #[arg(long)] + sector_downloading_concurrency: Option, + /// Defines how many sectors farmer will encode concurrently, defaults to 1 on UMA system and + /// number of NUMA nodes on NUMA system. It is further restricted by + /// `--sector-downloading-concurrency` and setting this option higher than + /// `--sector-downloading-concurrency` will have no effect. + #[arg(long)] + sector_encoding_concurrency: Option, + /// Allows to enable farming during initial plotting. Not used by default on machines with 8 or + /// less logical cores because plotting is so intense on CPU and memory that farming will likely + /// not work properly, yet it will significantly impact plotting speed, delaying the time when + /// farming can actually start properly. #[arg(long, default_value_t = should_farm_during_initial_plotting(), action = clap::ArgAction::Set)] farm_during_initial_plotting: bool, /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some - /// compute-intensive operations during proving), defaults to number of CPU cores available in - /// the system - #[arg(long, default_value_t = available_parallelism())] - farming_thread_pool_size: usize, - /// Size of PER FARM thread pool used for plotting, defaults to number of CPU cores available - /// in the system. + /// compute-intensive operations during proving), defaults to number of logical CPUs + /// available on UMA system and number of logical CPUs in first NUMA node on NUMA system + #[arg(long)] + farming_thread_pool_size: Option, + /// Size of one thread pool used for plotting, defaults to number of logical CPUs available + /// on UMA system and number of logical CPUs available in NUMA node on NUMA system. + /// + /// Number of thread pools is defined by `--sector-encoding-concurrency` option, different + /// thread pools might have different number of threads if NUMA nodes do not have the same size. + /// + /// Threads will be pinned to corresponding CPU cores at creation. + #[arg(long)] + plotting_thread_pool_size: Option, + /// Size of one thread pool used for replotting, typically smaller pool than for plotting + /// to not affect farming as much, defaults to half of the number of logical CPUs available on + /// UMA system and number of logical CPUs available in NUMA node on NUMA system. /// - /// NOTE: The fact that this parameter is per farm doesn't mean farmer will plot multiple - /// sectors concurrently, see `--sector-downloading-concurrency` and - /// `--sector-encoding-concurrency` options. - #[arg(long, default_value_t = available_parallelism())] - plotting_thread_pool_size: usize, - /// Size of PER FARM thread pool used for replotting, typically smaller pool than for plotting - /// to not affect farming as much, defaults to half of the number of CPU cores available in the - /// system. + /// Number of thread pools is defined by `--sector-encoding-concurrency` option, different + /// thread pools might have different number of threads if NUMA nodes do not have the same size. /// - /// NOTE: The fact that this parameter is per farm doesn't mean farmer will replot multiple - /// sectors concurrently, see `--sector-downloading-concurrency` and - /// `--sector-encoding-concurrency` options. - #[arg(long, default_value_t = available_parallelism() / 2)] - replotting_thread_pool_size: usize, + /// Threads will be pinned to corresponding CPU cores at creation. + #[arg(long)] + replotting_thread_pool_size: Option, } fn cache_percentage_parser(s: &str) -> anyhow::Result { @@ -441,33 +433,42 @@ where None => farmer_app_info.protocol_info.max_pieces_in_sector, }; - let downloading_semaphore = Arc::new(Semaphore::new(sector_downloading_concurrency.get())); - - let plotting_thread_pool_manager = ThreadPoolManager::new( - |thread_pool_index| { - ThreadPoolBuilder::new() - .thread_name(move |thread_index| { - format!("plotting-{thread_pool_index}.{thread_index}") - }) - .num_threads(plotting_thread_pool_size) - .spawn_handler(tokio_rayon_spawn_handler()) - .build() - }, - sector_encoding_concurrency, - )?; + let plotting_thread_pool_core_indices = + thread_pool_core_indices(plotting_thread_pool_size, sector_encoding_concurrency); + let replotting_thread_pool_core_indices = { + let mut replotting_thread_pool_core_indices = + thread_pool_core_indices(replotting_thread_pool_size, sector_encoding_concurrency); + if replotting_thread_pool_size.is_none() { + // The default behavior is to use all CPU cores, but for replotting we just want half + replotting_thread_pool_core_indices + .iter_mut() + .for_each(|cores| cores.truncate((cores.len() / 2).max(1))); + } + replotting_thread_pool_core_indices + }; - let replotting_thread_pool_manager = ThreadPoolManager::new( - |thread_pool_index| { - ThreadPoolBuilder::new() - .thread_name(move |thread_index| { - format!("replotting-{thread_pool_index}.{thread_index}") - }) - .num_threads(replotting_thread_pool_size) - .spawn_handler(tokio_rayon_spawn_handler()) - .build() - }, - sector_encoding_concurrency, + let downloading_semaphore = Arc::new(Semaphore::new( + sector_downloading_concurrency + .map(|sector_downloading_concurrency| sector_downloading_concurrency.get()) + .unwrap_or(plotting_thread_pool_core_indices.len() + 1), + )); + + let plotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_cores( + "plotting", + plotting_thread_pool_core_indices, + )?; + let replotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_cores( + "replotting", + replotting_thread_pool_core_indices, )?; + let farming_thread_pool_size = farming_thread_pool_size + .map(|farming_thread_pool_size| farming_thread_pool_size.get()) + .unwrap_or_else(|| { + all_cpus() + .first() + .expect("Not empty according to `all_cpus` function description; qed") + .len() + }); let mut plotting_delay_senders = Vec::with_capacity(disk_farms.len()); diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index cbd206e8d2..9456b0409e 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -5,18 +5,20 @@ pub mod ss58; #[cfg(test)] mod tests; +use crate::thread_pool_manager::ThreadPoolManager; use futures::channel::oneshot; use futures::channel::oneshot::Canceled; use futures::future::Either; -use rayon::ThreadBuilder; +use rayon::{ThreadBuilder, ThreadPoolBuildError, ThreadPoolBuilder}; use std::future::Future; +use std::num::NonZeroUsize; use std::ops::Deref; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use std::{io, thread}; use tokio::runtime::Handle; use tokio::task; -use tracing::debug; +use tracing::{debug, warn}; /// Joins async join handle on drop pub struct AsyncJoinOnDrop { @@ -136,6 +138,129 @@ where }) } +/// All CPU cores as numbers, grouped by NUMA nodes. +/// +/// Returned vector is guaranteed to have at least one non-empty element. +pub fn all_cpus() -> Vec> { + // TODO: NUMA support + vec![(0..num_cpus::get()).collect()] +} + +/// Thread indices for each thread pool +pub fn thread_pool_core_indices( + thread_pool_size: Option, + thread_pools: Option, +) -> Vec> { + let all_cpus = all_cpus(); + + if let Some(thread_pools) = thread_pools { + let mut thread_pool_core_indices = Vec::>::with_capacity(thread_pools.get()); + + if let Some(thread_pool_size) = thread_pool_size { + // If thread pool size is fixed, loop over all CPU cores as many times as necessary and + // assign contiguous ranges of CPU cores to corresponding thread pools + + let total_cpu_cores = all_cpus + .into_iter() + .flat_map(|cores| cores.into_iter()) + .count(); + for _ in 0..thread_pools.get() { + let cpu_cores_range = if let Some(last_cpu_index) = thread_pool_core_indices + .last() + .and_then(|thread_indices| thread_indices.last()) + .copied() + { + last_cpu_index + 1.. + } else { + 0.. + }; + + let cpu_cores = cpu_cores_range + .take(thread_pool_size.get()) + // To loop over all CPU cores multiple times, modulo naively obtained CPU + // cores by the total available number of CPU cores + .map(|core_index| core_index % total_cpu_cores) + .collect(); + + thread_pool_core_indices.push(cpu_cores); + } + } else { + // If thread pool size is not fixed, we iterate over all NUMA nodes as many times as + // necessary + + for thread_pool_index in 0..thread_pools.get() { + thread_pool_core_indices.push(all_cpus[thread_pool_index % all_cpus.len()].clone()); + } + } + thread_pool_core_indices + } else { + // If everything is set to defaults, use physical layout of CPUs + all_cpus + } +} + +#[inline(never)] +fn pin_to_cpu_core( + thread_prefix: &'static str, + thread_pool_index: usize, + thread_index: usize, + core: usize, +) { + if !core_affinity::set_for_current(core_affinity::CoreId { id: core }) { + warn!( + %thread_prefix, + %thread_pool_index, + %thread_index, + %core, + "Failed to set core affinity, timekeeper will run on random \ + CPU core", + ); + } +} + +/// Creates thread pools for each of CPUs with number of threads corresponding to number of cores in +/// each CPU and pins threads to those CPU cores. Each thread will have Tokio context available. +/// +/// The easiest way to obtain CPUs is using [`all_cpus`], but [`thread_pool_core_indices`] in case +/// support for user customizations is desired. +pub fn create_tokio_thread_pool_manager_for_pinned_cores( + thread_prefix: &'static str, + cpus: Vec>, +) -> Result { + let total_thread_pools = cpus.len(); + + ThreadPoolManager::new( + |thread_pool_index| { + let cores = cpus[thread_pool_index].clone(); + + ThreadPoolBuilder::new() + .thread_name(move |thread_index| { + format!("{thread_prefix}-{thread_pool_index}.{thread_index}") + }) + .num_threads(cores.len()) + .spawn_handler({ + let handle = Handle::current(); + + rayon_custom_spawn_handler(move |thread| { + let core = cores[thread.index()]; + let handle = handle.clone(); + + move || { + pin_to_cpu_core(thread_prefix, thread_pool_index, thread.index(), core); + + let _guard = handle.enter(); + + task::block_in_place(|| thread.run()) + } + }) + }) + .build() + }, + NonZeroUsize::new(total_thread_pools) + .expect("Thread pool is guaranteed to be non-empty; qed"), + ) +} + /// This function is supposed to be used with [`rayon::ThreadPoolBuilder::spawn_handler()`] to /// spawn handler with a custom logic defined by `spawn_hook_builder`. /// From 346d4a35e7d75f22ecd3dcb1c1fa365a1c7b40d0 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 26 Dec 2023 04:16:57 +0200 Subject: [PATCH 5/8] Remove outdated information from farmer's readme --- crates/subspace-farmer/README.md | 47 -------------------------------- 1 file changed, 47 deletions(-) diff --git a/crates/subspace-farmer/README.md b/crates/subspace-farmer/README.md index ff25e36f10..fa4d013fdb 100644 --- a/crates/subspace-farmer/README.md +++ b/crates/subspace-farmer/README.md @@ -2,28 +2,6 @@ Reference implementation of Subspace Farmer for Subspace Network Blockchain. -## Overview -**Notes:** The code is un-audited and not production ready, use it at your own risk. - -This repo is an implementation of a Farmer for [Subspace Network Blockchain](https://subspace.network). - -Subspace is a proof-of-storage blockchain that resolves the farmer's dilemma, to learn more read our [white paper](https://drive.google.com/file/d/1v847u_XeVf0SBz7Y7LEMXi72QfqirstL/view). - -## Some Notes on Plotting - -### Time to Plot - -Plotting time is roughly linear with respect to number of cores and clock speed of the host system. On average, it takes ~ 1 minute to create a 1GB plot or 18 hours to to create a 1TB plot, though these numbers will depend on the system used. This is largely independent of the storage media used (i.e. HDD, SATA SSD, NVME SSD) as it is largely a CPU-bound task. - -### Storage Overhead - -In addition to the plot a small Binary Search Tree (BST) is also stored on disk using RocksDB, which has roughly 1% of the storage size. -Due to current implementation two of such databases might be stored at once, though this will improve in the future. -There are also some supplementary database mappings. - -So creating a 1GB plot should actually consume about 1.03 GB of storage. -Plot size parameter specified in farming command accounts for this overhead, so you don't need to worry about implementation details. - ## Running It is recommended to follow general farming instructions that explain how to run both farmer and node together. @@ -83,28 +61,3 @@ target/production/subspace-farmer wipe /path/to/farm ``` This would wipe plots in the OS-specific users local data directory. - -## Architecture - -The farmer typically runs two processes in parallel: plotting and farming. - -### Plotting - -Think of it as the following pipeline: - -1. [Farmer receives new blocks from the blockchain](src/archiving.rs) -2. [Archives each of them](src/archiving.rs) -3. [Encodes each archived piece by applying the time-asymmetric SLOTH permutation as `encode(genesis_piece, farmer_public_key_hash, plot_index)`](src/single_plot_farm) -4. [Each encoding is written to the disk](src/single_plot_farm.rs) -3. [A commitment, or tag, to each encoding is created as `hmac(encoding, salt)` and stored within a binary search tree (BST)](src/single_plot_farm). - -This process currently takes ~ 36 hours per TiB on a quad-core machine, but for 1 GiB plotting should take between a few seconds and a few minutes. - -### [Farming](src/farming.rs) - -1. Connect to a client and subscribe to `slot_notifications` via JSON-RPC. -2. Given a global challenge as `hash(randomness || slot_index)` and `SOLUTION_RANGE`. -3. Derive local challenge as `hash(global_challenge || farmer_public_key_hash)`. -4. Query the BST for the nearest tag to the local challenge. -5. If it within `SOLUTION_RANGE` return a `SOLUTION` else return `None` -6. All the above can and will happen in parallel to plotting process, so it is possible to participate right away From e143fc2ec3e6848453f6036eb1b5144eca733b9f Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 26 Dec 2023 05:09:18 +0200 Subject: [PATCH 6/8] Add NUMA support to farmer, CI fixes --- .github/workflows/rust.yml | 14 +- .github/workflows/snapshot-build.yml | 60 +++++-- Cargo.lock | 146 +++++++++++++++++- Dockerfile-bootstrap-node | 4 +- Dockerfile-bootstrap-node.aarch64 | 5 +- Dockerfile-farmer | 4 +- Dockerfile-farmer.aarch64 | 5 +- Dockerfile-node | 4 +- Dockerfile-node.aarch64 | 5 +- Dockerfile-runtime | 5 +- crates/subspace-farmer/Cargo.toml | 6 + crates/subspace-farmer/README.md | 35 ++++- .../src/bin/subspace-farmer/commands/farm.rs | 13 +- crates/subspace-farmer/src/utils.rs | 24 ++- crates/subspace-node/README.md | 34 +++- docs/development.md | 36 +---- docs/farming.md | 5 +- 17 files changed, 340 insertions(+), 65 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 64f5081cc3..8a06facccd 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -32,7 +32,7 @@ env: jobs: cargo-fmt: - runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-22.04') }} + runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-22.04"') }} steps: - name: Checkout @@ -91,6 +91,11 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} + # Needed for hwloc + - name: Install automake (macOS) + run: brew install automake + if: runner.os == 'macOS' + # Workaround to resolve link error with C:\msys64\mingw64\bin\libclang.dll - name: Remove msys64 run: Remove-Item -LiteralPath "C:\msys64\" -Force -Recurse @@ -113,7 +118,7 @@ jobs: args: --locked --all-targets --features "runtime-benchmarks" -- -D warnings cargo-docs: - runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-22.04') }} + runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-22.04"') }} steps: - name: Checkout uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8 # @v3.1.0 @@ -170,6 +175,11 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} + # Needed for hwloc + - name: Install automake (macOS) + run: brew install automake + if: runner.os == 'macOS' + # Workaround to resolve link error with C:\msys64\mingw64\bin\libclang.dll - name: Remove msys64 run: Remove-Item -LiteralPath "C:\msys64\" -Force -Recurse diff --git a/.github/workflows/snapshot-build.yml b/.github/workflows/snapshot-build.yml index 3a3bfff9a2..eb33cf5515 100644 --- a/.github/workflows/snapshot-build.yml +++ b/.github/workflows/snapshot-build.yml @@ -18,7 +18,7 @@ env: jobs: container-linux: - runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-22.04') }} + runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-22.04"') }} permissions: contents: write packages: write @@ -84,35 +84,35 @@ jobs: strategy: matrix: build: - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-20.04') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-20.04"') }} target: x86_64-unknown-linux-gnu suffix: ubuntu-x86_64-skylake-${{ github.ref_name }} rustflags: "-C target-cpu=skylake" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-20.04') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-20.04"') }} target: x86_64-unknown-linux-gnu suffix: ubuntu-x86_64-v2-${{ github.ref_name }} rustflags: "-C target-cpu=x86-64-v2 -C target-feature=+aes" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-20.04') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-20.04"') }} target: aarch64-unknown-linux-gnu suffix: ubuntu-aarch64-${{ github.ref_name }} # TODO: AES flag is such that we have decent performance on ARMv8, remove once `aes` crate bumps MSRV to # at least 1.61: https://github.com/RustCrypto/block-ciphers/issues/373 rustflags: "-C linker=aarch64-linux-gnu-gcc --cfg aes_armv8" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "macos-12-arm64"]' || 'macos-12') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "macos-12-arm64"]' || '"macos-12"') }} target: aarch64-apple-darwin suffix: macos-aarch64-${{ github.ref_name }} # TODO: AES flag is such that we have decent performance on ARMv8, remove once `aes` crate bumps MSRV to # at least 1.61: https://github.com/RustCrypto/block-ciphers/issues/373 rustflags: "--cfg aes_armv8" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "macos-12-arm64"]' || 'macos-12') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "macos-12-arm64"]' || '"macos-12"') }} target: x86_64-apple-darwin suffix: macos-x86_64-${{ github.ref_name }} rustflags: "" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "windows-server-2022-x86-64"]' || 'windows-2022') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "windows-server-2022-x86-64"]' || '"windows-2022"') }} target: x86_64-pc-windows-msvc suffix: windows-x86_64-skylake-${{ github.ref_name }} rustflags: "-C target-cpu=skylake" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "windows-server-2022-x86-64"]' || 'windows-2022') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "windows-server-2022-x86-64"]' || '"windows-2022"') }} target: x86_64-pc-windows-msvc suffix: windows-x86_64-v2-${{ github.ref_name }} rustflags: "-C target-cpu=x86-64-v2 -C target-feature=+aes" @@ -148,6 +148,11 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} + # Needed for hwloc + - name: Install automake (macOS) + run: brew install automake + if: runner.os == 'macOS' + # Workaround to resolve link error with C:\msys64\mingw64\bin\libclang.dll - name: Remove msys64 run: Remove-Item -LiteralPath "C:\msys64\" -Force -Recurse @@ -156,14 +161,49 @@ jobs: continue-on-error: true - name: AArch64 cross-compile packages - run: sudo apt-get update && sudo apt-get install -y --no-install-recommends g++-aarch64-linux-gnu gcc-aarch64-linux-gnu libc6-dev-arm64-cross + run: | + FLAVOR="$(lsb_release -sc)" + + sudo tee /etc/apt/sources.list.d/arm64.list <> $GITHUB_ENV if: matrix.build.target == 'aarch64-unknown-linux-gnu' - - name: Build farmer + - name: Build farmer (Linux and Windows) uses: actions-rs/cargo@ae10961054e4aa8b4aa7dffede299aaf087aa33b # @v1.0.1 with: command: build args: --locked -Z build-std --target ${{ matrix.build.target }} --profile production --bin subspace-farmer + if: runner.os != 'macOS' + + # We build macOS without `numa` feature, primarily because of https://github.com/HadrienG2/hwlocality/issues/31 + - name: Build farmer (macOS) + uses: actions-rs/cargo@ae10961054e4aa8b4aa7dffede299aaf087aa33b # @v1.0.1 + with: + command: build + args: --locked -Z build-std --target ${{ matrix.build.target }} --profile production --bin subspace-farmer --no-default-features + if: runner.os == 'macOS' - name: Build node uses: actions-rs/cargo@ae10961054e4aa8b4aa7dffede299aaf087aa33b # @v1.0.1 diff --git a/Cargo.lock b/Cargo.lock index 5f4d994a95..edc949fdb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1044,6 +1044,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "autotools" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aef8da1805e028a172334c3b680f93e71126f2327622faef2ec3d893c0a4ad77" +dependencies = [ + "cc", +] + [[package]] name = "backoff" version = "0.4.0" @@ -1662,6 +1671,15 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "codespan-reporting" version = "0.11.1" @@ -2119,6 +2137,12 @@ dependencies = [ "cipher 0.4.4", ] +[[package]] +name = "cty" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b365fabc795046672053e29c954733ec3b05e4be654ab130fe8f1f94d7051f35" + [[package]] name = "curve25519-dalek" version = "2.1.3" @@ -3773,7 +3797,7 @@ source = "git+https://github.com/subspace/frontier?rev=37ee45323120b21adc1d69ae7 dependencies = [ "evm", "frame-support", - "num_enum", + "num_enum 0.6.1", "parity-scale-codec", "scale-info", "serde", @@ -4762,6 +4786,35 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hwlocality" +version = "1.0.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6020affad7f95b46f12607a8714aa70bd02c8df3b3abf9ef5c8cd2f7ae57a033" +dependencies = [ + "arrayvec 0.7.4", + "bitflags 2.4.0", + "derive_more", + "errno", + "hwlocality-sys", + "libc", + "num_enum 0.7.1", + "thiserror", +] + +[[package]] +name = "hwlocality-sys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "381b203a23b41c29be64454e9cee8d16360606d7e871f5d22532796b6095f164" +dependencies = [ + "autotools", + "cmake", + "libc", + "pkg-config", + "windows-sys 0.52.0", +] + [[package]] name = "hyper" version = "0.14.27" @@ -5358,6 +5411,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3979b5c37ece694f1f5e51e7ecc871fdb0f517ed04ee45f88d15d6d553cb9664" dependencies = [ "cc", + "cty", "libc", ] @@ -7103,7 +7157,16 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a015b430d3c108a207fd776d2e2196aaf8b1cf8cf93253e3a097ff3085076a1" dependencies = [ - "num_enum_derive", + "num_enum_derive 0.6.1", +] + +[[package]] +name = "num_enum" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683751d591e6d81200c39fb0d1032608b77724f34114db54f571ff1317b337c0" +dependencies = [ + "num_enum_derive 0.7.1", ] [[package]] @@ -7118,6 +7181,17 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "num_enum_derive" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c11e44798ad209ccdd91fc192f0526a369a01234f7373e1b141c96d7cee4f0e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "object" version = "0.30.4" @@ -11705,7 +11779,9 @@ dependencies = [ "fs4 0.7.0", "futures", "hex", + "hwlocality", "jsonrpsee", + "libmimalloc-sys", "lru 0.11.1", "mimalloc", "num_cpus", @@ -13897,6 +13973,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -13927,6 +14012,21 @@ dependencies = [ "windows_x86_64_msvc 0.48.5", ] +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -13939,6 +14039,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -13951,6 +14057,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -13963,6 +14075,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -13975,6 +14093,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -13987,6 +14111,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -13999,6 +14129,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -14011,6 +14147,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.15" diff --git a/Dockerfile-bootstrap-node b/Dockerfile-bootstrap-node index b5c9e62f6b..85e5109e7f 100644 --- a/Dockerfile-bootstrap-node +++ b/Dockerfile-bootstrap-node @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION diff --git a/Dockerfile-bootstrap-node.aarch64 b/Dockerfile-bootstrap-node.aarch64 index e338397b70..6b0902da88 100644 --- a/Dockerfile-bootstrap-node.aarch64 +++ b/Dockerfile-bootstrap-node.aarch64 @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION @@ -39,6 +41,7 @@ COPY test /code/test # Up until this line all Rust images in this repo should be the same to share the same layers ENV RUSTFLAGS="${RUSTFLAGS} -C linker=aarch64-linux-gnu-gcc" +ENV PKG_CONFIG_ALLOW_CROSS=true # Dependencies necessary for successful cross-compilation RUN \ diff --git a/Dockerfile-farmer b/Dockerfile-farmer index 186be74565..dd32f45ab0 100644 --- a/Dockerfile-farmer +++ b/Dockerfile-farmer @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION diff --git a/Dockerfile-farmer.aarch64 b/Dockerfile-farmer.aarch64 index 42a0dc0e33..11046baa2c 100644 --- a/Dockerfile-farmer.aarch64 +++ b/Dockerfile-farmer.aarch64 @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION @@ -39,6 +41,7 @@ COPY test /code/test # Up until this line all Rust images in this repo should be the same to share the same layers ENV RUSTFLAGS="${RUSTFLAGS} -C linker=aarch64-linux-gnu-gcc" +ENV PKG_CONFIG_ALLOW_CROSS=true # Dependencies necessary for successful cross-compilation RUN \ diff --git a/Dockerfile-node b/Dockerfile-node index dd50db59ae..a3775ac1fa 100644 --- a/Dockerfile-node +++ b/Dockerfile-node @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION diff --git a/Dockerfile-node.aarch64 b/Dockerfile-node.aarch64 index 55693d3816..0dd70f8822 100644 --- a/Dockerfile-node.aarch64 +++ b/Dockerfile-node.aarch64 @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION @@ -39,6 +41,7 @@ COPY test /code/test # Up until this line all Rust images in this repo should be the same to share the same layers ENV RUSTFLAGS="${RUSTFLAGS} -C linker=aarch64-linux-gnu-gcc" +ENV PKG_CONFIG_ALLOW_CROSS=true # Dependencies necessary for successful cross-compilation RUN \ diff --git a/Dockerfile-runtime b/Dockerfile-runtime index 6be061e62c..3bd02e65fa 100644 --- a/Dockerfile-runtime +++ b/Dockerfile-runtime @@ -14,11 +14,14 @@ RUN \ apt-get update && \ DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ ca-certificates \ + protobuf-compiler \ curl \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 0e5f0a7b84..3584f557d0 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -29,9 +29,11 @@ fdlimit = "0.3.0" fs4 = "0.7.0" futures = "0.3.29" hex = { version = "0.4.3", features = ["serde"] } +hwlocality = { version = "1.0.0-alpha.1", features = ["vendored"], optional = true } jsonrpsee = { version = "0.16.3", features = ["client"] } lru = "0.11.0" mimalloc = "0.1.39" +libmimalloc-sys = { version = "0.1.35", features = ["extended"] } num_cpus = "1.16.0" parity-scale-codec = "3.6.5" parking_lot = "0.12.1" @@ -60,3 +62,7 @@ tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } ulid = { version = "1.0.0", features = ["serde"] } zeroize = "1.6.0" + +[features] +default = ["numa"] +numa = ["dep:hwlocality"] diff --git a/crates/subspace-farmer/README.md b/crates/subspace-farmer/README.md index fa4d013fdb..be6e8c2f53 100644 --- a/crates/subspace-farmer/README.md +++ b/crates/subspace-farmer/README.md @@ -10,11 +10,42 @@ It is recommended to follow general farming instructions that explain how to run Rust toolchain is expected to be installed for anything in this repository to compile, but there are some extra dependencies for farmer specifically. -Prost library from libp2p dependency needs CMake, also LLVM/Clang is necessary: +`automake`,`libtool` and `pkg-config` on Linux/macOS or CMake on Windows for `hwlocality-sys` (if `numa` features is enabled, it is by default), also LLVM/Clang is necessary. + +### Ubuntu + +```bash +sudo apt-get install automake libtool pkg-config llvm clang +``` + +### macOS + +1. Install via Homebrew: + +```bash +brew install automake libtool llvm@15 clang +``` + +2. Add `llvm` to your `~/.zshrc` or `~/.bashrc`: + +```bash +export PATH="/opt/homebrew/opt/llvm@15/bin:$PATH" +``` + +3. Activate the changes: + ```bash -sudo apt-get install llvm clang cmake +source ~/.zshrc ``` +4. Verify that `llvm` is installed: + +```bash +llvm-config --version +``` + +### Build + Then build the farmer using Cargo: ``` cargo build --profile production --bin subspace-farmer diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index ca3536b73d..79b45b30c7 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -453,6 +453,7 @@ where .unwrap_or(plotting_thread_pool_core_indices.len() + 1), )); + let all_cpus = all_cpus(); let plotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_cores( "plotting", plotting_thread_pool_core_indices, @@ -464,12 +465,22 @@ where let farming_thread_pool_size = farming_thread_pool_size .map(|farming_thread_pool_size| farming_thread_pool_size.get()) .unwrap_or_else(|| { - all_cpus() + all_cpus .first() .expect("Not empty according to `all_cpus` function description; qed") .len() }); + // TODO: Remove code or environment variable once identified whether it helps or not + if std::env::var("NUMA_ALLOCATOR").is_ok() && all_cpus.len() > 1 { + unsafe { + libmimalloc_sys::mi_option_set( + libmimalloc_sys::mi_option_use_numa_nodes, + all_cpus.len() as std::ffi::c_long, + ); + } + } + let mut plotting_delay_senders = Vec::with_capacity(disk_farms.len()); for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() { diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index 9456b0409e..40729f8c07 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -142,7 +142,29 @@ where /// /// Returned vector is guaranteed to have at least one non-empty element. pub fn all_cpus() -> Vec> { - // TODO: NUMA support + #[cfg(feature = "numa")] + match hwlocality::Topology::new() { + Ok(topology) => { + let cpus = topology + // Iterate over NUMA nodes + .objects_at_depth(hwlocality::object::depth::Depth::NUMANode) + // For each NUMA nodes get CPU set + .filter_map(|node| node.cpuset()) + // For each CPU set extract individual cores + .map(|cpuset| cpuset.iter_set().map(usize::from).collect::>()) + .filter(|cores| !cores.is_empty()) + .collect::>(); + + if !cpus.is_empty() { + return cpus; + } else { + warn!("No CPU cores found in NUMA nodes"); + } + } + Err(error) => { + warn!(%error, "Failed to get CPU topology"); + } + } vec![(0..num_cpus::get()).collect()] } diff --git a/crates/subspace-node/README.md b/crates/subspace-node/README.md index af9d1988c5..dc8667bbe7 100644 --- a/crates/subspace-node/README.md +++ b/crates/subspace-node/README.md @@ -14,12 +14,42 @@ It is recommended to follow general farming instructions that explain how to run Rust toolchain is expected to be installed for anything in this repository to compile, but there are some extra dependencies for farmer specifically. -Prost library from libp2p dependency needs CMake, also LLVM/Clang and `make` are necessary: +### Ubuntu + +LLVM/Clang and `make` are necessary: ```bash sudo apt-get install llvm clang cmake make ``` -Then build the farmer using Cargo: +### macOS + +1. Install via Homebrew: + +```bash +brew install llvm@15 clang cmake make +``` + +2. Add `llvm` to your `~/.zshrc` or `~/.bashrc`: + +```bash +export PATH="/opt/homebrew/opt/llvm@15/bin:$PATH" +``` + +3. Activate the changes: + +```bash +source ~/.zshrc +``` + +4. Verify that `llvm` is installed: + +```bash +llvm-config --version +``` + +### Build + +Then build the node using Cargo: ``` cargo build --profile production --bin subspace-node target/production/subspace-node --version diff --git a/docs/development.md b/docs/development.md index f55fd3ef33..596dfb54c9 100644 --- a/docs/development.md +++ b/docs/development.md @@ -2,39 +2,7 @@ You'll have to have [Rust toolchain](https://rustup.rs/) installed as well as LLVM, Clang and CMake in addition to usual developer tooling. -Below are some examples of how to install these dependencies on different operating systems. - -### Ubuntu - -```bash -sudo apt-get install llvm clang cmake -``` - -### macOS - -1. Install via Homebrew: - -```bash -brew install llvm@15 clang cmake -``` - -2. Add `llvm` to your `~/.zshrc` or `~/.bashrc`: - -```bash -export PATH="/opt/homebrew/opt/llvm@15/bin:$PATH" -``` - -3. Activate the changes: - -```bash -source ~/.zshrc -``` - -4. Verify that `llvm` is installed: - -```bash -llvm-config --version -``` +Check [crates/subspace-node](../crates/subspace-node/README.md) and [crates/subspace-farmer](../crates/subspace-farmer/README.md) for required dependencies. ## To Farm By Yourself (Offline) @@ -43,7 +11,7 @@ llvm-config --version **Linux/MacOS:** -1. Make them executable: `chmod +x subspace-farmer-x86_64-*-snapshot subspace-node-x86_64-*-snapshot` +1. Make files executable: `chmod +x subspace-farmer-x86_64-*-snapshot subspace-node-x86_64-*-snapshot` 2. Run the node: `./subspace-node-x86_64-*-snapshot --dev --tmp` 3. In macOS, it may prompt that this app is not verified. Click on `cancel` instead of moving it to trash. To allow execution, go to `System Preferences -> Security & Privacy -> General`, and click on `allow`. diff --git a/docs/farming.md b/docs/farming.md index 3c65bf98af..6c9c3c6045 100644 --- a/docs/farming.md +++ b/docs/farming.md @@ -349,10 +349,7 @@ If you're running unsupported Linux distribution or CPU architecture, you may tr NOTE: This is primarily targeted at tech-savvy users and not recommended unless you know what you're doing. Please try to find answer to your question online before reaching out to maintainers. -You'll have to have [Rust toolchain](https://rustup.rs/) installed as well as LLVM, Clang and CMake in addition to usual developer tooling (Ubuntu example): -```bash -sudo apt-get install llvm clang cmake -``` +Check [crates/subspace-node](../crates/subspace-node/README.md) and [crates/subspace-farmer](../crates/subspace-farmer/README.md) for required dependencies. Now clone the source and build snapshot `snapshot-2022-apr-29` (replace occurrences with the snapshot you want to build): ```bash From 27e6bf7bb570739ad60b49ffc6a0de0aa45958c2 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 26 Dec 2023 06:58:50 +0200 Subject: [PATCH 7/8] Replace pinning threads to cores with pinning threads to cpu sets --- Cargo.lock | 3 +- crates/subspace-farmer/Cargo.toml | 7 +- .../src/bin/subspace-farmer/commands/farm.rs | 27 +-- .../src/thread_pool_manager.rs | 2 +- crates/subspace-farmer/src/utils.rs | 155 ++++++++++++------ 5 files changed, 129 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index edc949fdb2..41b02a582e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11771,7 +11771,6 @@ dependencies = [ "blake3", "bytesize", "clap", - "core_affinity", "criterion", "derive_more", "event-listener-primitives", @@ -11781,6 +11780,7 @@ dependencies = [ "hex", "hwlocality", "jsonrpsee", + "libc", "libmimalloc-sys", "lru 0.11.1", "mimalloc", @@ -11811,6 +11811,7 @@ dependencies = [ "tracing", "tracing-subscriber 0.3.17", "ulid", + "windows-sys 0.52.0", "zeroize", ] diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 3584f557d0..ef7317bfe3 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -21,7 +21,6 @@ blake2 = "0.10.6" blake3 = { version = "1.4.1", default-features = false } bytesize = "1.3.0" clap = { version = "4.4.3", features = ["color", "derive"] } -core_affinity = "0.8.1" criterion = { version = "0.5.1", default-features = false, features = ["rayon", "async"] } derive_more = "0.99.17" event-listener-primitives = "2.0.1" @@ -63,6 +62,12 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } ulid = { version = "1.0.0", features = ["serde"] } zeroize = "1.6.0" +[target.'cfg(not(windows))'.dependencies] +libc = "0.2.146" + +[target.'cfg(windows)'.dependencies] +windows-sys = "0.52.0" + [features] default = ["numa"] numa = ["dep:hwlocality"] diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 79b45b30c7..8213869768 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -29,8 +29,8 @@ use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::utils::ss58::parse_ss58_reward_address; use subspace_farmer::utils::{ - all_cpus, create_tokio_thread_pool_manager_for_pinned_cores, run_future_in_dedicated_thread, - thread_pool_core_indices, AsyncJoinOnDrop, + all_cpu_cores, create_tokio_thread_pool_manager_for_pinned_nodes, + run_future_in_dedicated_thread, thread_pool_core_indices, AsyncJoinOnDrop, }; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; @@ -47,9 +47,9 @@ use zeroize::Zeroizing; const RECORDS_ROOTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1_000_000).expect("Not zero; qed"); fn should_farm_during_initial_plotting() -> bool { - let total_cpu_cores = all_cpus() - .into_iter() - .flat_map(|cores| cores.into_iter()) + let total_cpu_cores = all_cpu_cores() + .iter() + .flat_map(|set| set.cpu_cores()) .count(); total_cpu_cores > 8 } @@ -442,7 +442,7 @@ where // The default behavior is to use all CPU cores, but for replotting we just want half replotting_thread_pool_core_indices .iter_mut() - .for_each(|cores| cores.truncate((cores.len() / 2).max(1))); + .for_each(|set| set.truncate(set.cpu_cores().len() / 2)); } replotting_thread_pool_core_indices }; @@ -453,30 +453,31 @@ where .unwrap_or(plotting_thread_pool_core_indices.len() + 1), )); - let all_cpus = all_cpus(); - let plotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_cores( + let all_cpu_cores = all_cpu_cores(); + let plotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_nodes( "plotting", plotting_thread_pool_core_indices, )?; - let replotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_cores( + let replotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_nodes( "replotting", replotting_thread_pool_core_indices, )?; let farming_thread_pool_size = farming_thread_pool_size .map(|farming_thread_pool_size| farming_thread_pool_size.get()) .unwrap_or_else(|| { - all_cpus + all_cpu_cores .first() - .expect("Not empty according to `all_cpus` function description; qed") + .expect("Not empty according to function description; qed") + .cpu_cores() .len() }); // TODO: Remove code or environment variable once identified whether it helps or not - if std::env::var("NUMA_ALLOCATOR").is_ok() && all_cpus.len() > 1 { + if std::env::var("NUMA_ALLOCATOR").is_ok() && all_cpu_cores.len() > 1 { unsafe { libmimalloc_sys::mi_option_set( libmimalloc_sys::mi_option_use_numa_nodes, - all_cpus.len() as std::ffi::c_long, + all_cpu_cores.len() as std::ffi::c_long, ); } } diff --git a/crates/subspace-farmer/src/thread_pool_manager.rs b/crates/subspace-farmer/src/thread_pool_manager.rs index 2de7decef4..1a6d6675d4 100644 --- a/crates/subspace-farmer/src/thread_pool_manager.rs +++ b/crates/subspace-farmer/src/thread_pool_manager.rs @@ -62,7 +62,7 @@ impl ThreadPoolManager { thread_pools: NonZeroUsize, ) -> Result where - C: Fn(usize) -> Result, + C: FnMut(usize) -> Result, { let inner = Inner { thread_pools: (0..thread_pools.get()) diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index 40729f8c07..c95e41d03f 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -138,14 +138,65 @@ where }) } -/// All CPU cores as numbers, grouped by NUMA nodes. +/// Abstraction for CPU core set +#[derive(Debug, Clone)] +pub struct CpuCoreSet { + /// CPU cores that belong to this set + cores: Vec, + #[cfg(feature = "numa")] + topology: Option>, +} + +impl CpuCoreSet { + pub fn cpu_cores(&self) -> &[usize] { + &self.cores + } + + /// Will truncate list of CPU cores to this number. + /// + /// If `cores` is zero, call will do nothing since zero number of cores is not allowed. + pub fn truncate(&mut self, cores: usize) { + self.cores.truncate(cores.max(1)); + } + + /// Pin current thread to this NUMA node (not just one CPU core) + pub fn pin_current_thread(&self) { + #[cfg(feature = "numa")] + if let Some(topology) = &self.topology { + use hwlocality::cpu::binding::CpuBindingFlags; + use hwlocality::cpu::cpuset::CpuSet; + use hwlocality::ffi::PositiveInt; + + #[cfg(not(windows))] + let thread_id = unsafe { libc::pthread_self() }; + #[cfg(windows)] + let thread_id = unsafe { windows_sys::Win32::System::Threading::GetCurrentThread() }; + + // load the cpuset for the given core index. + let cpu_cores = CpuSet::from_iter( + self.cores + .iter() + .map(|&core| PositiveInt::try_from(core).expect("Valid CPU core")), + ); + + if let Err(error) = + topology.bind_thread_cpu(thread_id, &cpu_cores, CpuBindingFlags::empty()) + { + warn!(%error, ?cpu_cores, "Failed to pin thread to CPU cores") + } + } + } +} + +/// Get all cpu cores, grouped into sets according to NUMA nodes. /// -/// Returned vector is guaranteed to have at least one non-empty element. -pub fn all_cpus() -> Vec> { +/// Returned vector is guaranteed to have at least one element and have non-zero number of CPU cores +/// in each set. +pub fn all_cpu_cores() -> Vec { #[cfg(feature = "numa")] - match hwlocality::Topology::new() { + match hwlocality::Topology::new().map(std::sync::Arc::new) { Ok(topology) => { - let cpus = topology + let cpu_cores = topology // Iterate over NUMA nodes .objects_at_depth(hwlocality::object::depth::Depth::NUMANode) // For each NUMA nodes get CPU set @@ -153,43 +204,54 @@ pub fn all_cpus() -> Vec> { // For each CPU set extract individual cores .map(|cpuset| cpuset.iter_set().map(usize::from).collect::>()) .filter(|cores| !cores.is_empty()) + .map(|cores| CpuCoreSet { + cores, + topology: Some(std::sync::Arc::clone(&topology)), + }) .collect::>(); - if !cpus.is_empty() { - return cpus; - } else { - warn!("No CPU cores found in NUMA nodes"); + if !cpu_cores.is_empty() { + return cpu_cores; } } Err(error) => { warn!(%error, "Failed to get CPU topology"); } } - vec![(0..num_cpus::get()).collect()] + vec![CpuCoreSet { + cores: (0..num_cpus::get()).collect(), + #[cfg(feature = "numa")] + topology: None, + }] } /// Thread indices for each thread pool pub fn thread_pool_core_indices( thread_pool_size: Option, thread_pools: Option, -) -> Vec> { - let all_cpus = all_cpus(); +) -> Vec { + let all_numa_nodes = all_cpu_cores(); + #[cfg(feature = "numa")] + let topology = &all_numa_nodes + .first() + .expect("Not empty according to function description; qed") + .topology; if let Some(thread_pools) = thread_pools { - let mut thread_pool_core_indices = Vec::>::with_capacity(thread_pools.get()); + let mut thread_pool_core_indices = Vec::::with_capacity(thread_pools.get()); if let Some(thread_pool_size) = thread_pool_size { // If thread pool size is fixed, loop over all CPU cores as many times as necessary and // assign contiguous ranges of CPU cores to corresponding thread pools - let total_cpu_cores = all_cpus - .into_iter() - .flat_map(|cores| cores.into_iter()) + let total_cpu_cores = all_numa_nodes + .iter() + .flat_map(|set| set.cpu_cores()) .count(); for _ in 0..thread_pools.get() { let cpu_cores_range = if let Some(last_cpu_index) = thread_pool_core_indices .last() - .and_then(|thread_indices| thread_indices.last()) + .and_then(|thread_indices| thread_indices.cpu_cores().last()) .copied() { last_cpu_index + 1.. @@ -204,71 +266,66 @@ pub fn thread_pool_core_indices( .map(|core_index| core_index % total_cpu_cores) .collect(); - thread_pool_core_indices.push(cpu_cores); + thread_pool_core_indices.push(CpuCoreSet { + cores: cpu_cores, + #[cfg(feature = "numa")] + topology: topology.clone(), + }); } } else { // If thread pool size is not fixed, we iterate over all NUMA nodes as many times as // necessary for thread_pool_index in 0..thread_pools.get() { - thread_pool_core_indices.push(all_cpus[thread_pool_index % all_cpus.len()].clone()); + thread_pool_core_indices.push(CpuCoreSet { + cores: all_numa_nodes[thread_pool_index % all_numa_nodes.len()] + .cores + .clone(), + #[cfg(feature = "numa")] + topology: topology.clone(), + }); } } thread_pool_core_indices } else { // If everything is set to defaults, use physical layout of CPUs - all_cpus - } -} - -#[inline(never)] -fn pin_to_cpu_core( - thread_prefix: &'static str, - thread_pool_index: usize, - thread_index: usize, - core: usize, -) { - if !core_affinity::set_for_current(core_affinity::CoreId { id: core }) { - warn!( - %thread_prefix, - %thread_pool_index, - %thread_index, - %core, - "Failed to set core affinity, timekeeper will run on random \ - CPU core", - ); + all_numa_nodes } } -/// Creates thread pools for each of CPUs with number of threads corresponding to number of cores in -/// each CPU and pins threads to those CPU cores. Each thread will have Tokio context available. +/// Creates thread pools for each of CPU core set with number of threads corresponding to number of cores in +/// each set and pins threads to all of those CPU cores (all at once, not thread per core). Each thread will +/// also have Tokio context available. /// -/// The easiest way to obtain CPUs is using [`all_cpus`], but [`thread_pool_core_indices`] in case +/// The easiest way to obtain CPUs is using [`all_cpu_cores`], but [`thread_pool_core_indices`] in case /// support for user customizations is desired. -pub fn create_tokio_thread_pool_manager_for_pinned_cores( +pub fn create_tokio_thread_pool_manager_for_pinned_nodes( thread_prefix: &'static str, - cpus: Vec>, + mut cpu_core_sets: Vec, ) -> Result { - let total_thread_pools = cpus.len(); + let total_thread_pools = cpu_core_sets.len(); ThreadPoolManager::new( |thread_pool_index| { - let cores = cpus[thread_pool_index].clone(); + let cpu_core_set = cpu_core_sets + .pop() + .expect("Number of thread pools is the same as cpu core sets; qed"); ThreadPoolBuilder::new() .thread_name(move |thread_index| { format!("{thread_prefix}-{thread_pool_index}.{thread_index}") }) - .num_threads(cores.len()) + .num_threads(cpu_core_set.cpu_cores().len()) .spawn_handler({ let handle = Handle::current(); rayon_custom_spawn_handler(move |thread| { - let core = cores[thread.index()]; + let cpu_core_set = cpu_core_set.clone(); let handle = handle.clone(); move || { - pin_to_cpu_core(thread_prefix, thread_pool_index, thread.index(), core); + cpu_core_set.pin_current_thread(); + drop(cpu_core_set); let _guard = handle.enter(); From 31de4f780ba4d13ec6bcae0b86731268efdeb08c Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 1 Jan 2024 11:35:34 +0200 Subject: [PATCH 8/8] Log message about NUMA system and number of farms --- .../src/bin/subspace-farmer/commands/farm.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 8213869768..af1fea4333 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -472,6 +472,19 @@ where .len() }); + if all_cpu_cores.len() > 1 { + info!(numa_nodes = %all_cpu_cores.len(), "NUMA system detected"); + + if all_cpu_cores.len() < disk_farms.len() { + warn!( + numa_nodes = %all_cpu_cores.len(), + farms_count = %disk_farms.len(), + "Too few disk farms, CPU will not be utilized fully during plotting, same number of farms as NUMA \ + nodes or more is recommended" + ); + } + } + // TODO: Remove code or environment variable once identified whether it helps or not if std::env::var("NUMA_ALLOCATOR").is_ok() && all_cpu_cores.len() > 1 { unsafe {