From 954694a516d7e29c711de44cf2f3d7e96bbd0b40 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 27 Sep 2023 13:14:58 +0300 Subject: [PATCH 1/7] Small refactoring of piece retrieval --- .../src/single_disk_farm/piece_reader.rs | 64 +++++++++---------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index dff3e20e59..c8e975e3b1 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -129,8 +129,7 @@ async fn read_pieces( error!( %sector_index, %sector_count, - "Tried to read piece from sector that is not yet \ - plotted" + "Tried to read piece from sector that is not yet plotted" ); continue; } @@ -139,13 +138,39 @@ async fn read_pieces( (sector_metadata, sector_count) }; + // Sector must be plotted + if sector_index >= sector_count { + warn!( + %sector_index, + %piece_offset, + %sector_count, + "Incorrect sector offset" + ); + // Doesn't matter if receiver still cares about it + let _ = response_sender.send(None); + continue; + } + // Piece must be within sector + if u16::from(piece_offset) >= pieces_in_sector { + warn!( + %sector_index, + %piece_offset, + %sector_count, + "Incorrect piece offset" + ); + // Doesn't matter if receiver still cares about it + let _ = response_sender.send(None); + continue; + } + + let sector_size = sector_size(pieces_in_sector); + let sector = &global_plot_mmap[sector_index as usize * sector_size..][..sector_size]; + let maybe_piece = read_piece::( &public_key, piece_offset, - pieces_in_sector, - sector_count, §or_metadata, - &global_plot_mmap, + sector, &erasure_coding, &mut table_generator, ); @@ -155,14 +180,11 @@ async fn read_pieces( } } -#[allow(clippy::too_many_arguments)] fn read_piece( public_key: &PublicKey, piece_offset: PieceOffset, - pieces_in_sector: u16, - sector_count: SectorIndex, sector_metadata: &SectorMetadataChecksummed, - global_plot: &[u8], + sector: &[u8], erasure_coding: &ErasureCoding, table_generator: &mut PosTable::Generator, ) -> Option @@ -170,31 +192,8 @@ where PosTable: Table, { let sector_index = sector_metadata.sector_index; - // Sector must be plotted - if sector_index >= sector_count { - warn!( - %sector_index, - %piece_offset, - %sector_count, - "Incorrect sector offset" - ); - return None; - } - // Piece must be within sector - if u16::from(piece_offset) >= pieces_in_sector { - warn!( - %sector_index, - %piece_offset, - %sector_count, - "Incorrect piece offset" - ); - return None; - } let sector_id = SectorId::new(public_key.hash(), sector_index); - let sector_size = sector_size(pieces_in_sector); - // TODO: Would be nicer to have list of plots here and just index it - let sector = &global_plot[sector_index as usize * sector_size..][..sector_size]; let piece = match reading::read_piece::( piece_offset, @@ -209,7 +208,6 @@ where error!( %sector_index, %piece_offset, - %sector_count, %error, "Failed to read piece from sector" ); From 28aea2f7a3db8c7ae0436f1a08d5239b51444da3 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 27 Sep 2023 13:05:11 +0300 Subject: [PATCH 2/7] Introduce `ReadAt` trait that is an abstraction for reading purposes in `subspace-farmer-components` --- .../benches/reading.rs | 4 +- .../src/auditing.rs | 33 ++++--- crates/subspace-farmer-components/src/lib.rs | 28 ++++++ .../subspace-farmer-components/src/proving.rs | 66 ++++++++++--- .../subspace-farmer-components/src/reading.rs | 97 +++++++++---------- .../src/single_disk_farm/piece_reader.rs | 2 +- 6 files changed, 148 insertions(+), 82 deletions(-) diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index cd9515f8f5..0badc6bae2 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -148,7 +148,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { group.throughput(Throughput::Elements(1)); group.bench_function("piece/memory", |b| { b.iter(|| { - read_piece::( + read_piece::( black_box(piece_offset), black_box(&plotted_sector.sector_id), black_box(&plotted_sector.sector_metadata), @@ -196,7 +196,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { let start = Instant::now(); for _i in 0..iters { for sector in plot_mmap.chunks_exact(sector_size) { - read_piece::( + read_piece::( black_box(piece_offset), black_box(&plotted_sector.sector_id), black_box(&plotted_sector.sector_metadata), diff --git a/crates/subspace-farmer-components/src/auditing.rs b/crates/subspace-farmer-components/src/auditing.rs index 3da71c4776..c419c273ce 100644 --- a/crates/subspace-farmer-components/src/auditing.rs +++ b/crates/subspace-farmer-components/src/auditing.rs @@ -1,10 +1,12 @@ use crate::proving::SolutionCandidates; use crate::sector::{SectorContentsMap, SectorMetadataChecksummed}; +use crate::ReadAt; use std::collections::VecDeque; use std::mem; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{Blake2b256Hash, PublicKey, SectorId, SectorIndex, SolutionRange}; use subspace_verification::is_within_solution_range; +use tracing::warn; #[derive(Debug, Clone)] pub(crate) struct ChunkCandidate { @@ -17,20 +19,23 @@ pub(crate) struct ChunkCandidate { /// Audit a single sector and generate a stream of solutions, where `sector` must be positioned /// correctly at the beginning of the sector (seek to desired offset before calling this function /// and seek back afterwards if necessary). -pub fn audit_sector<'a>( +pub fn audit_sector<'a, Sector>( public_key: &'a PublicKey, sector_index: SectorIndex, global_challenge: &Blake2b256Hash, solution_range: SolutionRange, - sector: &'a [u8], + sector: &'a Sector, sector_metadata: &'a SectorMetadataChecksummed, -) -> Option> { +) -> Option> +where + Sector: ReadAt + ?Sized, +{ let sector_id = SectorId::new(public_key.hash(), sector_index); let sector_slot_challenge = sector_id.derive_sector_slot_challenge(global_challenge); let s_bucket_audit_index = sector_slot_challenge.s_bucket_audit_index(); - let s_bucket_audit_size = Scalar::FULL_BYTES - * usize::from(sector_metadata.s_bucket_sizes[usize::from(s_bucket_audit_index)]); + let s_bucket_audit_size = + usize::from(sector_metadata.s_bucket_sizes[usize::from(s_bucket_audit_index)]); let s_bucket_audit_offset = Scalar::FULL_BYTES * sector_metadata .s_bucket_sizes @@ -43,15 +48,19 @@ pub fn audit_sector<'a>( let sector_contents_map_size = SectorContentsMap::encoded_size(sector_metadata.pieces_in_sector); - // Read s-bucket - let s_bucket = - §or[sector_contents_map_size + s_bucket_audit_offset..][..s_bucket_audit_size]; + let s_bucket_audit_offset_in_sector = sector_contents_map_size + s_bucket_audit_offset; // Map all winning chunks - let winning_chunks = s_bucket - .array_chunks::<{ Scalar::FULL_BYTES }>() - .enumerate() - .filter_map(|(chunk_offset, chunk)| { + let winning_chunks = (0..s_bucket_audit_size) + .filter_map(|chunk_offset| { + let mut chunk = [0; Scalar::FULL_BYTES]; + if let Err(error) = sector.read_at( + &mut chunk, + s_bucket_audit_offset_in_sector + chunk_offset * Scalar::FULL_BYTES, + ) { + warn!(%error, %sector_index, %chunk_offset, "Failed read chunk sector"); + return None; + } // Check all audit chunks within chunk, there might be more than one winning let winning_audit_chunk_offsets = chunk .array_chunks::<{ mem::size_of::() }>() diff --git a/crates/subspace-farmer-components/src/lib.rs b/crates/subspace-farmer-components/src/lib.rs index 931190ceec..bdc0655d01 100644 --- a/crates/subspace-farmer-components/src/lib.rs +++ b/crates/subspace-farmer-components/src/lib.rs @@ -24,8 +24,36 @@ mod segment_reconstruction; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; +use std::io; use subspace_core_primitives::HistorySize; +/// Trait for reading data at specific offset +pub trait ReadAt: Send + Sync { + /// Fill the buffer by reading bytes at a specific offset + fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()>; +} + +impl ReadAt for [u8] { + fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + if buf.len() + offset > self.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Buffer length with offset exceeds own length", + )); + } + + buf.copy_from_slice(&self[offset..][..buf.len()]); + + Ok(()) + } +} + +impl ReadAt for Vec { + fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + self.as_slice().read_at(buf, offset) + } +} + // Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to // usize depending on chain parameters const_assert!(std::mem::size_of::() >= std::mem::size_of::()); diff --git a/crates/subspace-farmer-components/src/proving.rs b/crates/subspace-farmer-components/src/proving.rs index cb6af64b6d..0808175ac3 100644 --- a/crates/subspace-farmer-components/src/proving.rs +++ b/crates/subspace-farmer-components/src/proving.rs @@ -3,7 +3,9 @@ use crate::reading::{read_record_metadata, read_sector_record_chunks, ReadingErr use crate::sector::{ SectorContentsMap, SectorContentsMapFromBytesError, SectorMetadataChecksummed, }; +use crate::ReadAt; use std::collections::VecDeque; +use std::io; use subspace_core_primitives::crypto::kzg::{Commitment, Kzg, Witness}; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{ @@ -51,6 +53,9 @@ pub enum ProvingError { /// Failed to decode sector contents map #[error("Failed to decode sector contents map: {0}")] FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError), + /// I/O error occurred + #[error("I/O error: {0}")] + Io(#[from] io::Error), /// Record reading error #[error("Record reading error: {0}")] RecordReadingError(#[from] ReadingError), @@ -67,24 +72,47 @@ struct WinningChunk { } /// Container for solutions -#[derive(Debug, Clone)] -pub struct SolutionCandidates<'a> { +#[derive(Debug)] +pub struct SolutionCandidates<'a, Sector> +where + Sector: ?Sized, +{ public_key: &'a PublicKey, sector_index: SectorIndex, sector_id: SectorId, s_bucket: SBucket, - sector: &'a [u8], + sector: &'a Sector, sector_metadata: &'a SectorMetadataChecksummed, chunk_candidates: VecDeque, } -impl<'a> SolutionCandidates<'a> { +impl<'a, Sector> Clone for SolutionCandidates<'a, Sector> +where + Sector: ?Sized, +{ + fn clone(&self) -> Self { + Self { + public_key: self.public_key, + sector_index: self.sector_index, + sector_id: self.sector_id, + s_bucket: self.s_bucket, + sector: self.sector, + sector_metadata: self.sector_metadata, + chunk_candidates: self.chunk_candidates.clone(), + } + } +} + +impl<'a, Sector> SolutionCandidates<'a, Sector> +where + Sector: ReadAt + ?Sized, +{ pub(crate) fn new( public_key: &'a PublicKey, sector_index: SectorIndex, sector_id: SectorId, s_bucket: SBucket, - sector: &'a [u8], + sector: &'a Sector, sector_metadata: &'a SectorMetadataChecksummed, chunk_candidates: VecDeque, ) -> Self { @@ -126,7 +154,7 @@ impl<'a> SolutionCandidates<'a> { RewardAddress: Copy, PosTable: Table, { - SolutionCandidatesIterator::<'a, RewardAddress, PosTable>::new( + SolutionCandidatesIterator::<'a, RewardAddress, Sector, PosTable>::new( self.public_key, reward_address, self.sector_index, @@ -151,8 +179,9 @@ struct ChunkCache { proof_of_space: PosProof, } -struct SolutionCandidatesIterator<'a, RewardAddress, PosTable> +struct SolutionCandidatesIterator<'a, RewardAddress, Sector, PosTable> where + Sector: ?Sized, PosTable: Table, { public_key: &'a PublicKey, @@ -165,7 +194,7 @@ where kzg: &'a Kzg, erasure_coding: &'a ErasureCoding, sector_contents_map: SectorContentsMap, - sector: &'a [u8], + sector: &'a Sector, winning_chunks: VecDeque, count: usize, chunk_cache: Option, @@ -173,9 +202,11 @@ where } // TODO: This can be potentially parallelized with rayon -impl Iterator for SolutionCandidatesIterator<'_, RewardAddress, PosTable> +impl Iterator + for SolutionCandidatesIterator<'_, RewardAddress, Sector, PosTable> where RewardAddress: Copy, + Sector: ReadAt + ?Sized, PosTable: Table, { type Item = Result, ProvingError>; @@ -333,16 +364,19 @@ where } } -impl ExactSizeIterator - for SolutionCandidatesIterator<'_, RewardAddress, PosTable> +impl ExactSizeIterator + for SolutionCandidatesIterator<'_, RewardAddress, Sector, PosTable> where RewardAddress: Copy, + Sector: ReadAt + ?Sized, PosTable: Table, { } -impl<'a, RewardAddress, PosTable> SolutionCandidatesIterator<'a, RewardAddress, PosTable> +impl<'a, RewardAddress, Sector, PosTable> + SolutionCandidatesIterator<'a, RewardAddress, Sector, PosTable> where + Sector: ReadAt + ?Sized, PosTable: Table, { #[allow(clippy::too_many_arguments)] @@ -352,7 +386,7 @@ where sector_index: SectorIndex, sector_id: SectorId, s_bucket: SBucket, - sector: &'a [u8], + sector: &'a Sector, sector_metadata: &'a SectorMetadataChecksummed, kzg: &'a Kzg, erasure_coding: &'a ErasureCoding, @@ -364,8 +398,12 @@ where } let sector_contents_map = { + let mut sector_contents_map_bytes = + vec![0; SectorContentsMap::encoded_size(sector_metadata.pieces_in_sector)]; + sector.read_at(&mut sector_contents_map_bytes, 0)?; + SectorContentsMap::from_bytes( - §or[..SectorContentsMap::encoded_size(sector_metadata.pieces_in_sector)], + §or_contents_map_bytes, sector_metadata.pieces_in_sector, )? }; diff --git a/crates/subspace-farmer-components/src/reading.rs b/crates/subspace-farmer-components/src/reading.rs index 7889bd8db8..a513cf1ce6 100644 --- a/crates/subspace-farmer-components/src/reading.rs +++ b/crates/subspace-farmer-components/src/reading.rs @@ -1,9 +1,11 @@ use crate::sector::{ - sector_record_chunks_size, sector_size, RecordMetadata, SectorContentsMap, - SectorContentsMapFromBytesError, SectorMetadataChecksummed, + sector_record_chunks_size, RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError, + SectorMetadataChecksummed, }; +use crate::ReadAt; use parity_scale_codec::Decode; use rayon::prelude::*; +use std::io; use std::mem::ManuallyDrop; use std::simd::Simd; use subspace_core_primitives::crypto::{blake3_hash, Scalar}; @@ -18,14 +20,6 @@ use tracing::debug; /// Errors that happen during reading #[derive(Debug, Error)] pub enum ReadingError { - /// Wrong sector size - #[error("Wrong sector size: expected {expected}, actual {actual}")] - WrongSectorSize { - /// Expected size in bytes - expected: usize, - /// Actual size in bytes - actual: usize, - }, /// Failed to read chunk. /// /// This is an implementation bug, most likely due to mismatch between sector contents map and @@ -34,6 +28,8 @@ pub enum ReadingError { FailedToReadChunk { /// Chunk location chunk_location: usize, + /// Low-level error + error: io::Error, }, /// Invalid chunk, possible disk corruption #[error( @@ -69,6 +65,9 @@ pub enum ReadingError { /// Failed to decode sector contents map #[error("Failed to decode sector contents map: {0}")] FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError), + /// I/O error occurred + #[error("I/O error: {0}")] + Io(#[from] io::Error), /// Checksum mismatch #[error("Checksum mismatch")] ChecksumMismatch, @@ -86,24 +85,18 @@ pub struct PlotRecord { } /// Read sector record chunks, only plotted s-buckets are returned (in decoded form) -pub fn read_sector_record_chunks( +pub fn read_sector_record_chunks( piece_offset: PieceOffset, pieces_in_sector: u16, s_bucket_offsets: &[u32; Record::NUM_S_BUCKETS], sector_contents_map: &SectorContentsMap, pos_table: &PosTable, - sector: &[u8], + sector: &Sector, ) -> Result; Record::NUM_S_BUCKETS]>, ReadingError> where PosTable: Table, + Sector: ReadAt + ?Sized, { - if sector.len() != sector_size(pieces_in_sector) { - return Err(ReadingError::WrongSectorSize { - expected: sector_size(pieces_in_sector), - actual: sector.len(), - }); - } - let mut record_chunks = vec![None; Record::NUM_S_BUCKETS]; record_chunks @@ -126,11 +119,17 @@ where let chunk_location = chunk_offset + s_bucket_offset as usize; - let mut record_chunk = sector[SectorContentsMap::encoded_size(pieces_in_sector)..] - .array_chunks::<{ Scalar::FULL_BYTES }>() - .nth(chunk_location) - .copied() - .ok_or(ReadingError::FailedToReadChunk { chunk_location })?; + let mut record_chunk = [0; Scalar::FULL_BYTES]; + sector + .read_at( + &mut record_chunk, + SectorContentsMap::encoded_size(pieces_in_sector) + + chunk_location * Scalar::FULL_BYTES, + ) + .map_err(|error| ReadingError::FailedToReadChunk { + chunk_location, + error, + })?; // Decode chunk if necessary if encoded_chunk_used { @@ -224,57 +223,49 @@ pub fn recover_source_record_chunks( } /// Read metadata (commitment and witness) for record -pub(crate) fn read_record_metadata( +pub(crate) fn read_record_metadata( piece_offset: PieceOffset, pieces_in_sector: u16, - sector: &[u8], -) -> Result { - if sector.len() != sector_size(pieces_in_sector) { - return Err(ReadingError::WrongSectorSize { - expected: sector_size(pieces_in_sector), - actual: sector.len(), - }); - } - + sector: &Sector, +) -> Result +where + Sector: ReadAt + ?Sized, +{ let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) + sector_record_chunks_size(pieces_in_sector); // Move to the beginning of the commitment and witness we care about - let record_metadata_bytes = §or[sector_metadata_start..] - [RecordMetadata::encoded_size() * usize::from(piece_offset)..]; - let record_metadata = RecordMetadata::decode(&mut &*record_metadata_bytes).expect( - "Length is correct and checked above, contents doesn't have specific structure to \ - it; qed", - ); + let record_metadata_offset = + sector_metadata_start + RecordMetadata::encoded_size() * usize::from(piece_offset); + + let mut record_metadata_bytes = [0; RecordMetadata::encoded_size()]; + sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?; + let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref()) + .expect("Length is correct, contents doesn't have specific structure to it; qed"); Ok(record_metadata) } /// Read piece from sector -pub fn read_piece( +pub fn read_piece( piece_offset: PieceOffset, sector_id: &SectorId, sector_metadata: &SectorMetadataChecksummed, - sector: &[u8], + sector: &Sector, erasure_coding: &ErasureCoding, table_generator: &mut PosTable::Generator, ) -> Result where PosTable: Table, + Sector: ReadAt + ?Sized, { let pieces_in_sector = sector_metadata.pieces_in_sector; - if sector.len() != sector_size(pieces_in_sector) { - return Err(ReadingError::WrongSectorSize { - expected: sector_size(pieces_in_sector), - actual: sector.len(), - }); - } - let sector_contents_map = { - SectorContentsMap::from_bytes( - §or[..SectorContentsMap::encoded_size(pieces_in_sector)], - pieces_in_sector, - )? + let mut sector_contents_map_bytes = + vec![0; SectorContentsMap::encoded_size(pieces_in_sector)]; + sector.read_at(&mut sector_contents_map_bytes, 0)?; + + SectorContentsMap::from_bytes(§or_contents_map_bytes, pieces_in_sector)? }; // Restore source record scalars diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index c8e975e3b1..c97d3d6bcd 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -195,7 +195,7 @@ where let sector_id = SectorId::new(public_key.hash(), sector_index); - let piece = match reading::read_piece::( + let piece = match reading::read_piece::( piece_offset, §or_id, sector_metadata, From 122f16572c83ff1f096c8257dd2b621d3188a939 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 27 Sep 2023 15:36:48 +0300 Subject: [PATCH 3/7] Implement `ReadAt` for `File`, add `ReadAt::offset()` and remove `memmap2` usage from `subspace-farmer-components` --- Cargo.lock | 1 - crates/subspace-farmer-components/Cargo.toml | 1 - .../benches/auditing.rs | 23 +++-------- .../benches/proving.rs | 16 +++----- .../benches/reading.rs | 17 +++----- crates/subspace-farmer-components/src/lib.rs | 40 +++++++++++++++++++ 6 files changed, 57 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e89f52022..0b3b775796 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11187,7 +11187,6 @@ dependencies = [ "hex", "libc", "lru 0.10.0", - "memmap2 0.7.1", "parity-scale-codec", "parking_lot 0.12.1", "rand 0.8.5", diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml index 64ce27812c..18d8b83e64 100644 --- a/crates/subspace-farmer-components/Cargo.toml +++ b/crates/subspace-farmer-components/Cargo.toml @@ -43,7 +43,6 @@ tracing = "0.1.37" [dev-dependencies] criterion = "0.5.1" futures = "0.3.28" -memmap2 = "0.7.1" subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" } subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space", features = ["chia"] } diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index 749ad2e8b2..024caf3862 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -1,6 +1,5 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use futures::executor::block_on; -use memmap2::Mmap; use rand::prelude::*; use std::fs::OpenOptions; use std::io::Write; @@ -21,7 +20,7 @@ use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy, use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::Table; @@ -183,29 +182,19 @@ pub fn criterion_benchmark(c: &mut Criterion) { .unwrap(); } - let plot_mmap = unsafe { Mmap::map(&plot_file).unwrap() }; - - #[cfg(unix)] - { - plot_mmap.advise(memmap2::Advice::Random).unwrap(); - } - group.throughput(Throughput::Elements(sectors_count)); - group.bench_function("disk", move |b| { + group.bench_function("disk", |b| { b.iter_custom(|iters| { let start = Instant::now(); for _i in 0..iters { - for (sector_index, sector) in plot_mmap - .chunks_exact(sector_size) - .enumerate() - .map(|(sector_index, sector)| (sector_index as SectorIndex, sector)) - { + for sector_index in 0..sectors_count as usize { + let sector = plot_file.offset(sector_index * sector_size); audit_sector( black_box(&public_key), - black_box(sector_index), + black_box(sector_index as SectorIndex), black_box(&global_challenge), black_box(solution_range), - black_box(sector), + black_box(§or), black_box(&plotted_sector.sector_metadata), ); } diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index 53c3c367a3..c48e8981ec 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -1,6 +1,5 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use futures::executor::block_on; -use memmap2::Mmap; use rand::prelude::*; use schnorrkel::Keypair; use std::fs::OpenOptions; @@ -21,7 +20,7 @@ use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy, use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::Table; @@ -234,15 +233,12 @@ pub fn criterion_benchmark(c: &mut Criterion) { .unwrap(); } - let plot_mmap = unsafe { Mmap::map(&plot_file).unwrap() }; - - #[cfg(unix)] - { - plot_mmap.advise(memmap2::Advice::Random).unwrap(); - } + let sectors = (0..sectors_count as usize) + .map(|sector_offset| plot_file.offset(sector_offset * sector_size)) + .collect::>(); - let solution_candidates = plot_mmap - .chunks_exact(sector_size) + let solution_candidates = sectors + .iter() .map(|sector| { audit_sector( &public_key, diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index 0badc6bae2..baae0a350c 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -1,6 +1,5 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use futures::executor::block_on; -use memmap2::Mmap; use rand::prelude::*; use std::fs::OpenOptions; use std::io::Write; @@ -20,7 +19,7 @@ use subspace_farmer_components::reading::read_piece; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, ReadAt}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::Table; @@ -183,24 +182,18 @@ pub fn criterion_benchmark(c: &mut Criterion) { .unwrap(); } - let plot_mmap = unsafe { Mmap::map(&plot_file).unwrap() }; - - #[cfg(unix)] - { - plot_mmap.advise(memmap2::Advice::Random).unwrap(); - } - group.throughput(Throughput::Elements(sectors_count)); - group.bench_function("piece/disk", move |b| { + group.bench_function("piece/disk", |b| { b.iter_custom(|iters| { let start = Instant::now(); for _i in 0..iters { - for sector in plot_mmap.chunks_exact(sector_size) { + for sector_index in 0..sectors_count as usize { + let sector = plot_file.offset(sector_index * sector_size); read_piece::( black_box(piece_offset), black_box(&plotted_sector.sector_id), black_box(&plotted_sector.sector_metadata), - black_box(sector), + black_box(§or), black_box(&erasure_coding), black_box(&mut table_generator), ) diff --git a/crates/subspace-farmer-components/src/lib.rs b/crates/subspace-farmer-components/src/lib.rs index bdc0655d01..9ef2aaee8c 100644 --- a/crates/subspace-farmer-components/src/lib.rs +++ b/crates/subspace-farmer-components/src/lib.rs @@ -22,13 +22,26 @@ pub mod reading; pub mod sector; mod segment_reconstruction; +use crate::file_ext::FileExt; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; +use std::fs::File; use std::io; use subspace_core_primitives::HistorySize; /// Trait for reading data at specific offset pub trait ReadAt: Send + Sync { + /// Get implementation of [`ReadAt`] that add specified offset to all attempted reads + fn offset(&self, offset: usize) -> ReadAtOffset<&Self> + where + Self: Sized, + { + ReadAtOffset { + inner: self, + offset, + } + } + /// Fill the buffer by reading bytes at a specific offset fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()>; } @@ -54,6 +67,33 @@ impl ReadAt for Vec { } } +impl ReadAt for File { + fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + self.read_exact_at(buf, offset as u64) + } +} + +impl ReadAt for &File { + fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + self.read_exact_at(buf, offset as u64) + } +} + +/// Reader with fixed offset added to all attempted reads +pub struct ReadAtOffset { + inner: T, + offset: usize, +} + +impl ReadAt for ReadAtOffset +where + T: ReadAt, +{ + fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> { + self.inner.read_at(buf, offset + self.offset) + } +} + // Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to // usize depending on chain parameters const_assert!(std::mem::size_of::() >= std::mem::size_of::()); From 253cd395f282c7db3b3af4c6b1291e1761119c24 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 28 Sep 2023 06:05:16 +0300 Subject: [PATCH 4/7] Remove `memmap2` usage from `subspace-farmer` --- Cargo.lock | 2 - crates/subspace-farmer/Cargo.toml | 2 - .../subspace-farmer/src/single_disk_farm.rs | 60 ++++------ .../src/single_disk_farm/farming.rs | 12 +- .../src/single_disk_farm/piece_cache.rs | 112 +++++++++--------- .../src/single_disk_farm/piece_reader.rs | 28 ++--- .../src/single_disk_farm/plotting.rs | 29 ++--- 7 files changed, 109 insertions(+), 136 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b3b775796..92fec52c62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11143,8 +11143,6 @@ dependencies = [ "jemallocator", "jsonrpsee", "lru 0.10.0", - "memmap2 0.7.1", - "parity-db", "parity-scale-codec", "parking_lot 0.12.1", "rand 0.8.5", diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 228813cd89..8bc5583ee7 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -28,8 +28,6 @@ futures = "0.3.28" hex = { version = "0.4.3", features = ["serde"] } jsonrpsee = { version = "0.16.2", features = ["client"] } lru = "0.10.0" -memmap2 = "0.7.1" -parity-db = "0.4.6" parity-scale-codec = "3.6.3" parking_lot = "0.12.1" rand = "0.8.5" diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 359d50f99c..902917a3ce 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -19,7 +19,6 @@ use futures::channel::{mpsc, oneshot}; use futures::future::{select, Either}; use futures::stream::FuturesUnordered; use futures::StreamExt; -use memmap2::{Mmap, MmapOptions}; use parity_scale_codec::{Decode, Encode}; use parking_lot::{Mutex, RwLock}; use rayon::prelude::*; @@ -737,10 +736,12 @@ impl SingleDiskFarm { .create(true) .open(directory.join(Self::METADATA_FILE))?; + metadata_file.advise_random_access()?; + let metadata_size = metadata_file.seek(SeekFrom::End(0))?; let expected_metadata_size = RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(target_sector_count); - let (metadata_header, metadata_header_mmap) = if metadata_size == 0 { + let metadata_header = if metadata_size == 0 { let metadata_header = PlotMetadataHeader { version: 0, plotted_sector_count: 0, @@ -751,13 +752,7 @@ impl SingleDiskFarm { .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?; metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?; - let metadata_header_mmap = unsafe { - MmapOptions::new() - .len(PlotMetadataHeader::encoded_size()) - .map_mut(&metadata_file)? - }; - - (metadata_header, metadata_header_mmap) + metadata_header } else { if metadata_size != expected_metadata_size { // Allocating the whole file (`set_len` below can create a sparse file, which will @@ -768,14 +763,12 @@ impl SingleDiskFarm { // Truncating file (if necessary) metadata_file.set_len(expected_metadata_size)?; } - let mut metadata_header_mmap = unsafe { - MmapOptions::new() - .len(PlotMetadataHeader::encoded_size()) - .map_mut(&metadata_file)? - }; + + let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()]; + metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?; let mut metadata_header = - PlotMetadataHeader::decode(&mut metadata_header_mmap.as_ref()) + PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref()) .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?; if metadata_header.version != Self::SUPPORTED_PLOT_VERSION { @@ -786,29 +779,24 @@ impl SingleDiskFarm { if metadata_header.plotted_sector_count > target_sector_count { metadata_header.plotted_sector_count = target_sector_count; - metadata_header.encode_to(&mut metadata_header_mmap.as_mut()); + metadata_file.write_all_at(&metadata_header.encode(), 0)?; } - (metadata_header, metadata_header_mmap) + metadata_header }; let sectors_metadata = { - let metadata_mmap = unsafe { - MmapOptions::new() - .offset(RESERVED_PLOT_METADATA) - .len(sector_metadata_size * usize::from(target_sector_count)) - .map(&metadata_file)? - }; - let mut sectors_metadata = Vec::::with_capacity(usize::from(target_sector_count)); - for mut sector_metadata_bytes in metadata_mmap - .chunks_exact(sector_metadata_size) - .take(metadata_header.plotted_sector_count as usize) - { + let mut sector_metadata_bytes = vec![0; sector_metadata_size]; + for sector_index in 0..metadata_header.plotted_sector_count { + metadata_file.read_exact_at( + &mut sector_metadata_bytes, + RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index), + )?; sectors_metadata.push( - SectorMetadataChecksummed::decode(&mut sector_metadata_bytes) + SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) .map_err(SingleDiskFarmError::FailedToDecodeSectorMetadata)?, ); } @@ -824,6 +812,8 @@ impl SingleDiskFarm { .open(directory.join(Self::PLOT_FILE))?, ); + plot_file.advise_random_access()?; + // Allocating the whole file (`set_len` below can create a sparse file, which will cause // writes to fail later) plot_file @@ -890,7 +880,6 @@ impl SingleDiskFarm { sector_size, sector_metadata_size, metadata_header, - metadata_header_mmap, plot_file, metadata_file, sectors_metadata, @@ -963,12 +952,7 @@ impl SingleDiskFarm { let farming_join_handle = thread::Builder::new() .name(format!("farming-{disk_farm_index}")) .spawn({ - let plot_mmap = unsafe { Mmap::map(&*plot_file)? }; - #[cfg(unix)] - { - plot_mmap.advise(memmap2::Advice::Random)?; - } - + let plot_file = Arc::clone(&plot_file); let handle = handle.clone(); let erasure_coding = erasure_coding.clone(); let handlers = Arc::clone(&handlers); @@ -995,7 +979,7 @@ impl SingleDiskFarm { reward_address, node_client, sector_size, - plot_mmap, + &plot_file, sectors_metadata, kzg, erasure_coding, @@ -1024,7 +1008,7 @@ impl SingleDiskFarm { let (piece_reader, reading_fut) = PieceReader::new::( public_key, pieces_in_sector, - unsafe { Mmap::map(&*plot_file)? }, + plot_file, Arc::clone(§ors_metadata), erasure_coding, modifying_sector_index, diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index e12bc659e0..58c92fa8d7 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -3,18 +3,18 @@ use crate::node_client::NodeClient; use crate::single_disk_farm::Handlers; use futures::channel::mpsc; use futures::StreamExt; -use memmap2::Mmap; use parking_lot::RwLock; use rayon::prelude::*; use rayon::{ThreadPoolBuildError, ThreadPoolBuilder}; +use std::fs::File; use std::io; use std::sync::Arc; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{PublicKey, SectorIndex, Solution}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector; -use subspace_farmer_components::proving; use subspace_farmer_components::sector::SectorMetadataChecksummed; +use subspace_farmer_components::{proving, ReadAt}; use subspace_proof_of_space::Table; use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; use thiserror::Error; @@ -77,7 +77,7 @@ pub(super) async fn farming( reward_address: PublicKey, node_client: NC, sector_size: usize, - plot_mmap: Mmap, + plot_file: &File, sectors_metadata: Arc>>, kzg: Kzg, erasure_coding: ErasureCoding, @@ -105,10 +105,14 @@ where let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied(); let mut solutions = Vec::>::new(); + let sectors = (0..sector_count) + .map(|sector_index| plot_file.offset(sector_index * sector_size)) + .collect::>(); + let solution_candidates = thread_pool.install(|| { sectors_metadata .par_iter() - .zip(plot_mmap.par_chunks_exact(sector_size)) + .zip(§ors) .enumerate() .filter_map(|(sector_index, (sector_metadata, sector))| { let sector_index = sector_index as u16; diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs index c83e13962d..ca9ac36579 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs @@ -1,5 +1,4 @@ use derive_more::Display; -use memmap2::{Mmap, MmapOptions}; use std::fs::{File, OpenOptions}; use std::path::Path; use std::sync::Arc; @@ -43,8 +42,7 @@ pub struct Offset(usize); #[derive(Debug)] struct Inner { file: File, - read_mmap: Mmap, - file_size: usize, + num_elements: usize, } /// Piece cache stored on one disk @@ -67,6 +65,8 @@ impl DiskPieceCache { .create(true) .open(directory.join(Self::FILE_NAME))?; + file.advise_random_access()?; + let expected_size = Self::element_size() * capacity; // Allocating the whole file (`set_len` below can create a sparse file, which will cause // writes to fail later) @@ -75,17 +75,10 @@ impl DiskPieceCache { // Truncating file (if necessary) file.set_len(expected_size as u64)?; - let read_mmap = unsafe { MmapOptions::new().len(expected_size).map(&file)? }; - #[cfg(unix)] - { - read_mmap.advise(memmap2::Advice::Random)?; - } - Ok(Self { inner: Arc::new(Inner { file, - read_mmap, - file_size: expected_size, + num_elements: expected_size / Self::element_size(), }), }) } @@ -101,28 +94,33 @@ impl DiskPieceCache { pub(crate) fn contents( &self, ) -> impl ExactSizeIterator)> + '_ { - self.inner - .read_mmap - .array_chunks::<{ Self::element_size() }>() - .enumerate() - .map(|(offset, chunk)| { - let (piece_index_bytes, piece_bytes) = chunk.split_at(PieceIndex::SIZE); - let piece_index = PieceIndex::from_bytes( - piece_index_bytes - .try_into() - .expect("Statically known to have correct size; qed"), - ); - // Piece index zero might mean we have piece index zero or just an empty space - let piece_index = if piece_index != PieceIndex::ZERO - || piece_bytes.iter().any(|&byte| byte != 0) - { + let file = &self.inner.file; + let mut element = vec![0; Self::element_size()]; + + (0..self.inner.num_elements).map(move |offset| { + if let Err(error) = + file.read_exact_at(&mut element, (offset * Self::element_size()) as u64) + { + warn!(%error, %offset, "Failed to read cache element #1"); + return (Offset(offset), None); + } + + let (piece_index_bytes, piece_bytes) = element.split_at(PieceIndex::SIZE); + let piece_index = PieceIndex::from_bytes( + piece_index_bytes + .try_into() + .expect("Statically known to have correct size; qed"), + ); + // Piece index zero might mean we have piece index zero or just an empty space + let piece_index = + if piece_index != PieceIndex::ZERO || piece_bytes.iter().any(|&byte| byte != 0) { Some(piece_index) } else { None }; - (Offset(offset), piece_index) - }) + (Offset(offset), piece_index) + }) } /// Store piece in cache at specified offset, replacing existing piece if there is any @@ -136,27 +134,26 @@ impl DiskPieceCache { piece: &Piece, ) -> Result<(), DiskPieceCacheError> { let Offset(offset) = offset; - if offset >= self.inner.file_size / Self::element_size() { + if offset >= self.inner.num_elements { return Err(DiskPieceCacheError::OffsetOutsideOfRange { provided: offset, - max: self.inner.file_size / Self::element_size() - 1, + max: self.inner.num_elements - 1, }); } - let mut write_mmap = unsafe { - MmapOptions::new() - .offset((offset * Self::element_size()) as u64) - .len(Self::element_size()) - .map_mut(&self.inner.file)? - }; + let element_offset = (offset * Self::element_size()) as u64; - let (piece_index_bytes, remaining_bytes) = write_mmap.split_at_mut(PieceIndex::SIZE); - piece_index_bytes.copy_from_slice(&piece_index.to_bytes()); - let (piece_bytes, checksum) = remaining_bytes.split_at_mut(Piece::SIZE); - piece_bytes.copy_from_slice(piece.as_ref()); - checksum.copy_from_slice(&blake3_hash_list(&[piece_index_bytes, piece.as_ref()])); - - write_mmap.flush()?; + let piece_index_bytes = piece_index.to_bytes(); + self.inner + .file + .write_all_at(&piece_index_bytes, element_offset)?; + self.inner + .file + .write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?; + self.inner.file.write_all_at( + &blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]), + element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64, + )?; Ok(()) } @@ -169,16 +166,22 @@ impl DiskPieceCache { /// doesn't happen for the same piece being accessed! pub(crate) fn read_piece_index(&self, offset: Offset) -> Option { let Offset(offset) = offset; - if offset >= self.inner.file_size / Self::element_size() { + if offset >= self.inner.num_elements { warn!(%offset, "Trying to read piece out of range, this must be an implementation bug"); return None; } - Some(PieceIndex::from_bytes( - self.inner.read_mmap[Self::element_size() * offset..][..PieceIndex::SIZE] - .try_into() - .expect("Statically guaranteed to be correct size; qed"), - )) + let mut piece_index_bytes = [0; PieceIndex::SIZE]; + + if let Err(error) = self.inner.file.read_exact_at( + &mut piece_index_bytes, + (offset * Self::element_size()) as u64, + ) { + warn!(%error, %offset, "Failed to read cache piece index"); + return None; + } + + Some(PieceIndex::from_bytes(piece_index_bytes)) } /// Read piece from cache at specified offset. @@ -189,14 +192,17 @@ impl DiskPieceCache { /// doesn't happen for the same piece being accessed! pub(crate) fn read_piece(&self, offset: Offset) -> Result, DiskPieceCacheError> { let Offset(offset) = offset; - if offset >= self.inner.file_size / Self::element_size() { + if offset >= self.inner.num_elements { warn!(%offset, "Trying to read piece out of range, this must be an implementation bug"); return Ok(None); } - let piece_element_memory = - &self.inner.read_mmap[offset * Self::element_size()..][..Self::element_size()]; - let (piece_index_bytes, remaining_bytes) = piece_element_memory.split_at(PieceIndex::SIZE); + let mut element = vec![0; Self::element_size()]; + self.inner + .file + .read_exact_at(&mut element, (offset * Self::element_size()) as u64)?; + + let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE); let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE); let mut piece = Piece::default(); piece.copy_from_slice(piece_bytes); diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index c97d3d6bcd..9cd7cb1317 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -1,13 +1,13 @@ use futures::channel::{mpsc, oneshot}; use futures::{SinkExt, StreamExt}; -use memmap2::Mmap; use parking_lot::RwLock; +use std::fs::File; use std::future::Future; use std::sync::Arc; use subspace_core_primitives::{Piece, PieceOffset, PublicKey, SectorId, SectorIndex}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::reading; use subspace_farmer_components::sector::{sector_size, SectorMetadataChecksummed}; +use subspace_farmer_components::{reading, ReadAt}; use subspace_proof_of_space::Table; use tracing::{error, warn}; @@ -32,7 +32,7 @@ impl PieceReader { pub(super) fn new( public_key: PublicKey, pieces_in_sector: u16, - global_plot_mmap: Mmap, + plot_file: Arc, sectors_metadata: Arc>>, erasure_coding: ErasureCoding, modifying_sector_index: Arc>>, @@ -45,7 +45,7 @@ impl PieceReader { let reading_fut = read_pieces::( public_key, pieces_in_sector, - global_plot_mmap, + plot_file, sectors_metadata, erasure_coding, modifying_sector_index, @@ -83,7 +83,7 @@ impl PieceReader { async fn read_pieces( public_key: PublicKey, pieces_in_sector: u16, - global_plot_mmap: Mmap, + plot_file: Arc, sectors_metadata: Arc>>, erasure_coding: ErasureCoding, modifying_sector_index: Arc>>, @@ -91,13 +91,6 @@ async fn read_pieces( ) where PosTable: Table, { - #[cfg(unix)] - { - if let Err(error) = global_plot_mmap.advise(memmap2::Advice::Random) { - error!(%error, "Failed to set random access on global plot mmap"); - } - } - let mut table_generator = PosTable::generator(); while let Some(read_piece_request) = read_piece_receiver.next().await { @@ -164,13 +157,13 @@ async fn read_pieces( } let sector_size = sector_size(pieces_in_sector); - let sector = &global_plot_mmap[sector_index as usize * sector_size..][..sector_size]; + let sector = plot_file.offset(sector_index as usize * sector_size); - let maybe_piece = read_piece::( + let maybe_piece = read_piece::( &public_key, piece_offset, §or_metadata, - sector, + §or, &erasure_coding, &mut table_generator, ); @@ -180,16 +173,17 @@ async fn read_pieces( } } -fn read_piece( +fn read_piece( public_key: &PublicKey, piece_offset: PieceOffset, sector_metadata: &SectorMetadataChecksummed, - sector: &[u8], + sector: &Sector, erasure_coding: &ErasureCoding, table_generator: &mut PosTable::Generator, ) -> Option where PosTable: Table, + Sector: ReadAt + ?Sized, { let sector_index = sector_metadata.sector_index; diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 4971f6115d..1872555f32 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -6,7 +6,6 @@ use atomic::Atomic; use futures::channel::{mpsc, oneshot}; use futures::{select, FutureExt, SinkExt, StreamExt}; use lru::LruCache; -use memmap2::{MmapMut, MmapOptions}; use parity_scale_codec::Encode; use parking_lot::RwLock; use std::collections::HashMap; @@ -23,6 +22,7 @@ use subspace_core_primitives::{ SegmentIndex, }; use subspace_erasure_coding::ErasureCoding; +use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::plotting; use subspace_farmer_components::plotting::{ plot_sector, PieceGetter, PieceGetterRetryPolicy, PlottedSector, @@ -85,7 +85,6 @@ pub(super) async fn plotting( sector_size: usize, sector_metadata_size: usize, mut metadata_header: PlotMetadataHeader, - mut metadata_header_mmap: MmapMut, plot_file: Arc, metadata_file: File, sectors_metadata: Arc>>, @@ -107,21 +106,8 @@ where while let Some((sector_index, _acknowledgement_sender)) = sectors_to_plot.next().await { trace!(%sector_index, "Preparing to plot sector"); - let mut sector = unsafe { - MmapOptions::new() - .offset((sector_index as usize * sector_size) as u64) - .len(sector_size) - .map_mut(&*plot_file)? - }; - let mut sector_metadata = unsafe { - MmapOptions::new() - .offset( - RESERVED_PLOT_METADATA - + (u64::from(sector_index) * sector_metadata_size as u64), - ) - .len(sector_metadata_size) - .map_mut(&metadata_file)? - }; + let mut sector = vec![0; sector_size]; + let mut sector_metadata = vec![0; sector_metadata_size]; let maybe_old_sector_metadata = sectors_metadata.read().get(sector_index as usize).cloned(); @@ -176,12 +162,15 @@ where modifying_sector_index.write().replace(sector_index); let plotted_sector = plot_sector_fut.await?; - sector.flush()?; - sector_metadata.flush()?; + plot_file.write_all_at(§or, (sector_index as usize * sector_size) as u64)?; + metadata_file.write_all_at( + §or_metadata, + RESERVED_PLOT_METADATA + (u64::from(sector_index) * sector_metadata_size as u64), + )?; if sector_index + 1 > metadata_header.plotted_sector_count { metadata_header.plotted_sector_count = sector_index + 1; - metadata_header.encode_to(&mut metadata_header_mmap.as_mut()); + metadata_file.write_all_at(&metadata_header.encode(), 0)?; } { let mut sectors_metadata = sectors_metadata.write(); From 1ca9549b1111d0df7cadff2da46ae1efbf240d6e Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 28 Sep 2023 08:24:35 +0300 Subject: [PATCH 5/7] Create several `*Options` structs in farmer --- .../subspace-farmer/src/single_disk_farm.rs | 37 ++++---- .../src/single_disk_farm/farming.rs | 48 +++++++--- .../src/single_disk_farm/plotting.rs | 91 +++++++++++++------ 3 files changed, 117 insertions(+), 59 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 902917a3ce..76d501fb1a 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -6,12 +6,14 @@ mod plotting; use crate::identity::{Identity, IdentityError}; use crate::node_client::NodeClient; use crate::reward_signing::reward_signing; -use crate::single_disk_farm::farming::farming; pub use crate::single_disk_farm::farming::FarmingError; +use crate::single_disk_farm::farming::{farming, FarmingOptions}; use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError}; use crate::single_disk_farm::piece_reader::PieceReader; pub use crate::single_disk_farm::plotting::PlottingError; -use crate::single_disk_farm::plotting::{plotting, plotting_scheduler}; +use crate::single_disk_farm::plotting::{ + plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, +}; use crate::utils::JoinOnDrop; use derive_more::{Display, From}; use event_listener_primitives::{Bag, HandlerId}; @@ -873,7 +875,7 @@ impl SingleDiskFarm { return Ok(()); } - plotting::<_, _, PosTable>( + let plotting_options = PlottingOptions { public_key, node_client, pieces_in_sector, @@ -890,8 +892,8 @@ impl SingleDiskFarm { modifying_sector_index, target_sector_count, sectors_to_plot_receiver, - ) - .await + }; + plotting::<_, _, PosTable>(plotting_options).await }; let initial_plotting_result = handle.block_on(select( @@ -909,16 +911,17 @@ impl SingleDiskFarm { } })?; - tasks.push(Box::pin(plotting_scheduler( - public_key.hash(), + let plotting_scheduler_options = PlottingSchedulerOptions { + public_key_hash: public_key.hash(), sectors_indices_left_to_plot, target_sector_count, - farmer_app_info.protocol_info.history_size.segment_index(), - farmer_app_info.protocol_info.min_sector_lifetime, - node_client.clone(), - Arc::clone(§ors_metadata), + last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(), + min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime, + node_client: node_client.clone(), + sectors_metadata: Arc::clone(§ors_metadata), sectors_to_plot_sender, - ))); + }; + tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options))); let (mut slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0); @@ -973,21 +976,21 @@ impl SingleDiskFarm { return Ok(()); } - farming::<_, PosTable>( + let farming_options = FarmingOptions { disk_farm_index, public_key, reward_address, node_client, sector_size, - &plot_file, + plot_file: &plot_file, sectors_metadata, kzg, erasure_coding, handlers, modifying_sector_index, - slot_info_forwarder_receiver, - ) - .await + slot_info_notifications: slot_info_forwarder_receiver, + }; + farming::(farming_options).await }; let farming_result = handle.block_on(select( diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 58c92fa8d7..5127483fdb 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -64,31 +64,49 @@ pub enum FarmingError { FailedToCreateThreadPool(#[from] ThreadPoolBuildError), } +pub(super) struct FarmingOptions<'a, NC> { + pub(super) disk_farm_index: usize, + pub(super) public_key: PublicKey, + pub(super) reward_address: PublicKey, + pub(super) node_client: NC, + pub(super) sector_size: usize, + pub(super) plot_file: &'a File, + pub(super) sectors_metadata: Arc>>, + pub(super) kzg: Kzg, + pub(super) erasure_coding: ErasureCoding, + pub(super) handlers: Arc, + pub(super) modifying_sector_index: Arc>>, + pub(super) slot_info_notifications: mpsc::Receiver, +} + /// Starts farming process. /// /// NOTE: Returned future is async, but does blocking operations and should be running in dedicated /// thread. // False-positive, we do drop lock before .await #[allow(clippy::await_holding_lock)] -#[allow(clippy::too_many_arguments)] -pub(super) async fn farming( - disk_farm_index: usize, - public_key: PublicKey, - reward_address: PublicKey, - node_client: NC, - sector_size: usize, - plot_file: &File, - sectors_metadata: Arc>>, - kzg: Kzg, - erasure_coding: ErasureCoding, - handlers: Arc, - modifying_sector_index: Arc>>, - mut slot_info_notifications: mpsc::Receiver, +pub(super) async fn farming( + farming_options: FarmingOptions<'_, NC>, ) -> Result<(), FarmingError> where - NC: NodeClient, PosTable: Table, + NC: NodeClient, { + let FarmingOptions { + disk_farm_index, + public_key, + reward_address, + node_client, + sector_size, + plot_file, + sectors_metadata, + kzg, + erasure_coding, + handlers, + modifying_sector_index, + mut slot_info_notifications, + } = farming_options; + let mut table_generator = PosTable::generator(); let thread_pool = ThreadPoolBuilder::new() .thread_name(move |thread_index| format!("farming-{disk_farm_index}.{thread_index}")) diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 1872555f32..29482f8566 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -73,37 +73,60 @@ pub enum PlottingError { LowLevel(#[from] plotting::PlottingError), } +pub(super) struct PlottingOptions { + pub(super) public_key: PublicKey, + pub(super) node_client: NC, + pub(super) pieces_in_sector: u16, + pub(super) sector_size: usize, + pub(super) sector_metadata_size: usize, + pub(super) metadata_header: PlotMetadataHeader, + pub(super) plot_file: Arc, + pub(super) metadata_file: File, + pub(super) sectors_metadata: Arc>>, + pub(super) piece_getter: PG, + pub(super) kzg: Kzg, + pub(super) erasure_coding: ErasureCoding, + pub(super) handlers: Arc, + pub(super) modifying_sector_index: Arc>>, + pub(super) target_sector_count: u16, + pub(super) sectors_to_plot_receiver: mpsc::Receiver<(SectorIndex, oneshot::Sender<()>)>, +} + /// Starts plotting process. /// /// NOTE: Returned future is async, but does blocking operations and should be running in dedicated /// thread. -#[allow(clippy::too_many_arguments)] pub(super) async fn plotting( - public_key: PublicKey, - node_client: NC, - pieces_in_sector: u16, - sector_size: usize, - sector_metadata_size: usize, - mut metadata_header: PlotMetadataHeader, - plot_file: Arc, - metadata_file: File, - sectors_metadata: Arc>>, - piece_getter: PG, - kzg: Kzg, - erasure_coding: ErasureCoding, - handlers: Arc, - modifying_sector_index: Arc>>, - target_sector_count: u16, - mut sectors_to_plot: mpsc::Receiver<(SectorIndex, oneshot::Sender<()>)>, + plotting_options: PlottingOptions, ) -> Result<(), PlottingError> where NC: NodeClient, PG: PieceGetter + Send + 'static, PosTable: Table, { + let PlottingOptions { + public_key, + node_client, + pieces_in_sector, + sector_size, + sector_metadata_size, + mut metadata_header, + plot_file, + metadata_file, + sectors_metadata, + piece_getter, + kzg, + erasure_coding, + handlers, + modifying_sector_index, + target_sector_count, + mut sectors_to_plot_receiver, + } = plotting_options; + let mut table_generator = PosTable::generator(); // TODO: Concurrency - while let Some((sector_index, _acknowledgement_sender)) = sectors_to_plot.next().await { + while let Some((sector_index, _acknowledgement_sender)) = sectors_to_plot_receiver.next().await + { trace!(%sector_index, "Preparing to plot sector"); let mut sector = vec![0; sector_size]; @@ -230,20 +253,34 @@ where Ok(()) } -#[allow(clippy::too_many_arguments)] +pub(super) struct PlottingSchedulerOptions { + pub(super) public_key_hash: Blake2b256Hash, + pub(super) sectors_indices_left_to_plot: Range, + pub(super) target_sector_count: SectorIndex, + pub(super) last_archived_segment_index: SegmentIndex, + pub(super) min_sector_lifetime: HistorySize, + pub(super) node_client: NC, + pub(super) sectors_metadata: Arc>>, + pub(super) sectors_to_plot_sender: mpsc::Sender<(SectorIndex, oneshot::Sender<()>)>, +} + pub(super) async fn plotting_scheduler( - public_key_hash: Blake2b256Hash, - sectors_indices_left_to_plot: Range, - target_sector_count: SectorIndex, - last_archived_segment_index: SegmentIndex, - min_sector_lifetime: HistorySize, - node_client: NC, - sectors_metadata: Arc>>, - sectors_to_plot_sender: mpsc::Sender<(SectorIndex, oneshot::Sender<()>)>, + plotting_scheduler_options: PlottingSchedulerOptions, ) -> Result<(), BackgroundTaskError> where NC: NodeClient, { + let PlottingSchedulerOptions { + public_key_hash, + sectors_indices_left_to_plot, + target_sector_count, + last_archived_segment_index, + min_sector_lifetime, + node_client, + sectors_metadata, + sectors_to_plot_sender, + } = plotting_scheduler_options; + // Create a proxy channel with atomically updatable last archived segment that // allows to not buffer messages from RPC subscription, but also access the most // recent value at any time From d5901bc3383fe9494c152649eb0e0148bbe9509f Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 28 Sep 2023 09:09:48 +0300 Subject: [PATCH 6/7] Configurable farmer thread pool sizes --- .../src/bin/subspace-farmer/commands/farm.rs | 26 ++++++ .../src/bin/subspace-farmer/main.rs | 30 ++++++- .../subspace-farmer/src/single_disk_farm.rs | 18 +++- .../src/single_disk_farm/farming.rs | 9 +- .../src/single_disk_farm/plotting.rs | 87 ++++++++++++++----- 5 files changed, 139 insertions(+), 31 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 64b2419281..db4109ab6a 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -9,6 +9,7 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use lru::LruCache; use parking_lot::Mutex; +use rayon::ThreadPoolBuilder; use std::fs; use std::num::NonZeroUsize; use std::sync::Arc; @@ -52,6 +53,9 @@ where dev, tmp, mut disk_farms, + farming_thread_pool_size, + plotting_thread_pool_size, + replotting_thread_pool_size, } = farming_args; // Override the `--enable_private_ips` flag with `--dev` @@ -173,6 +177,25 @@ where None => farmer_app_info.protocol_info.max_pieces_in_sector, }; + let farming_thread_pool = Arc::new( + ThreadPoolBuilder::new() + .thread_name(move |thread_index| format!("farming#{thread_index}")) + .num_threads(farming_thread_pool_size) + .build()?, + ); + let plotting_thread_pool = Arc::new( + ThreadPoolBuilder::new() + .thread_name(move |thread_index| format!("plotting#{thread_index}")) + .num_threads(plotting_thread_pool_size) + .build()?, + ); + let replotting_thread_pool = Arc::new( + ThreadPoolBuilder::new() + .thread_name(move |thread_index| format!("replotting#{thread_index}")) + .num_threads(replotting_thread_pool_size) + .build()?, + ); + // TODO: Check plot and metadata sizes to ensure there is enough space for farmer to not // fail later for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() { @@ -191,6 +214,9 @@ where erasure_coding: erasure_coding.clone(), piece_getter: piece_getter.clone(), cache_percentage, + farming_thread_pool: Arc::clone(&farming_thread_pool), + plotting_thread_pool: Arc::clone(&plotting_thread_pool), + replotting_thread_pool: Arc::clone(&replotting_thread_pool), }, disk_farm_index, ); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index af56017038..5b683bb4be 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -15,7 +15,7 @@ use subspace_core_primitives::PublicKey; use subspace_farmer::single_disk_farm::SingleDiskFarm; use subspace_networking::libp2p::Multiaddr; use subspace_proof_of_space::chia::ChiaTable; -use tracing::info; +use tracing::{info, warn}; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; @@ -31,6 +31,21 @@ type PosTable = ChiaTable; #[global_allocator] static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; +fn available_parallelism() -> usize { + match std::thread::available_parallelism() { + Ok(parallelism) => parallelism.get(), + Err(error) => { + warn!( + %error, + "Unable to identify available parallelism, you might want to configure thread pool sizes with CLI \ + options manually" + ); + + 0 + } + } +} + /// Arguments for farmer #[derive(Debug, Parser)] struct FarmingArgs { @@ -77,9 +92,20 @@ struct FarmingArgs { /// DSN parameters #[clap(flatten)] dsn: DsnArgs, - /// Do not print info about configured farms on startup. + /// Do not print info about configured farms on startup #[arg(long)] no_info: bool, + /// Size of thread pool used for farming (mostly for blocking I/O, but also for some compute-intensive + /// operations during proving), defaults to number of CPU cores available in the system + #[arg(long, default_value_t = available_parallelism())] + farming_thread_pool_size: usize, + /// Size of thread pool used for plotting, defaults to number of CPU cores available in the system + #[arg(long, default_value_t = available_parallelism())] + plotting_thread_pool_size: usize, + /// Size of thread pool used for replotting, typically smaller pool than for plotting to not affect farming as much, + /// defaults to half of the number of CPU cores available in the system. + #[arg(long, default_value_t = available_parallelism() / 2)] + replotting_thread_pool_size: usize, } fn cache_percentage_parser(s: &str) -> anyhow::Result { diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 76d501fb1a..1ccbd536fb 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -24,6 +24,7 @@ use futures::StreamExt; use parity_scale_codec::{Decode, Encode}; use parking_lot::{Mutex, RwLock}; use rayon::prelude::*; +use rayon::ThreadPool; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; use std::fs::{File, OpenOptions}; @@ -284,6 +285,14 @@ pub struct SingleDiskFarmOptions { pub erasure_coding: ErasureCoding, /// Percentage of allocated space dedicated for caching purposes pub cache_percentage: NonZeroU8, + /// Thread pool used for farming (mostly for blocking I/O, but also for some compute-intensive + /// operations during proving) + pub farming_thread_pool: Arc, + /// Thread pool used for plotting + pub plotting_thread_pool: Arc, + /// Thread pool used for replotting, typically smaller pool than for plotting to not affect + /// farming as much + pub replotting_thread_pool: Arc, } /// Errors happening when trying to create/open single disk farm @@ -570,7 +579,7 @@ impl SingleDiskFarm { ) -> Result where NC: NodeClient, - PG: PieceGetter + Send + 'static, + PG: PieceGetter + Clone + Send + 'static, PosTable: Table, { let handle = Handle::current(); @@ -586,6 +595,9 @@ impl SingleDiskFarm { kzg, erasure_coding, cache_percentage, + farming_thread_pool, + plotting_thread_pool, + replotting_thread_pool, } = options; fs::create_dir_all(&directory)?; @@ -892,6 +904,8 @@ impl SingleDiskFarm { modifying_sector_index, target_sector_count, sectors_to_plot_receiver, + plotting_thread_pool, + replotting_thread_pool, }; plotting::<_, _, PosTable>(plotting_options).await }; @@ -977,7 +991,6 @@ impl SingleDiskFarm { } let farming_options = FarmingOptions { - disk_farm_index, public_key, reward_address, node_client, @@ -989,6 +1002,7 @@ impl SingleDiskFarm { handlers, modifying_sector_index, slot_info_notifications: slot_info_forwarder_receiver, + thread_pool: farming_thread_pool, }; farming::(farming_options).await }; diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 5127483fdb..a783cedf49 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -5,7 +5,7 @@ use futures::channel::mpsc; use futures::StreamExt; use parking_lot::RwLock; use rayon::prelude::*; -use rayon::{ThreadPoolBuildError, ThreadPoolBuilder}; +use rayon::{ThreadPool, ThreadPoolBuildError}; use std::fs::File; use std::io; use std::sync::Arc; @@ -65,7 +65,6 @@ pub enum FarmingError { } pub(super) struct FarmingOptions<'a, NC> { - pub(super) disk_farm_index: usize, pub(super) public_key: PublicKey, pub(super) reward_address: PublicKey, pub(super) node_client: NC, @@ -77,6 +76,7 @@ pub(super) struct FarmingOptions<'a, NC> { pub(super) handlers: Arc, pub(super) modifying_sector_index: Arc>>, pub(super) slot_info_notifications: mpsc::Receiver, + pub(super) thread_pool: Arc, } /// Starts farming process. @@ -93,7 +93,6 @@ where NC: NodeClient, { let FarmingOptions { - disk_farm_index, public_key, reward_address, node_client, @@ -105,12 +104,10 @@ where handlers, modifying_sector_index, mut slot_info_notifications, + thread_pool, } = farming_options; let mut table_generator = PosTable::generator(); - let thread_pool = ThreadPoolBuilder::new() - .thread_name(move |thread_index| format!("farming-{disk_farm_index}.{thread_index}")) - .build()?; while let Some(slot_info) = slot_info_notifications.next().await { let slot = slot_info.slot_number; diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 29482f8566..64f59de4fc 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -8,6 +8,7 @@ use futures::{select, FutureExt, SinkExt, StreamExt}; use lru::LruCache; use parity_scale_codec::Encode; use parking_lot::RwLock; +use rayon::ThreadPool; use std::collections::HashMap; use std::fs::File; use std::io; @@ -30,6 +31,7 @@ use subspace_farmer_components::plotting::{ use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_proof_of_space::Table; use thiserror::Error; +use tokio::runtime::Handle; use tracing::{debug, info, trace, warn}; const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500); @@ -90,6 +92,8 @@ pub(super) struct PlottingOptions { pub(super) modifying_sector_index: Arc>>, pub(super) target_sector_count: u16, pub(super) sectors_to_plot_receiver: mpsc::Receiver<(SectorIndex, oneshot::Sender<()>)>, + pub(super) plotting_thread_pool: Arc, + pub(super) replotting_thread_pool: Arc, } /// Starts plotting process. @@ -101,7 +105,7 @@ pub(super) async fn plotting( ) -> Result<(), PlottingError> where NC: NodeClient, - PG: PieceGetter + Send + 'static, + PG: PieceGetter + Clone + Send + 'static, PosTable: Table, { let PlottingOptions { @@ -121,6 +125,8 @@ where modifying_sector_index, target_sector_count, mut sectors_to_plot_receiver, + plotting_thread_pool, + replotting_thread_pool, } = plotting_options; let mut table_generator = PosTable::generator(); @@ -129,12 +135,10 @@ where { trace!(%sector_index, "Preparing to plot sector"); - let mut sector = vec![0; sector_size]; - let mut sector_metadata = vec![0; sector_metadata_size]; - let maybe_old_sector_metadata = sectors_metadata.read().get(sector_index as usize).cloned(); + let replotting = maybe_old_sector_metadata.is_some(); - if maybe_old_sector_metadata.is_some() { + if replotting { debug!(%sector_index, "Replotting sector"); } else { debug!(%sector_index, "Plotting sector"); @@ -167,24 +171,65 @@ where break farmer_app_info; }; - let plot_sector_fut = plot_sector::<_, PosTable>( - &public_key, - sector_index, - &piece_getter, - PieceGetterRetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()), - &farmer_app_info.protocol_info, - &kzg, - &erasure_coding, - pieces_in_sector, - &mut sector, - &mut sector_metadata, - &mut table_generator, - ); - // Inform others that this sector is being modified modifying_sector_index.write().replace(sector_index); - let plotted_sector = plot_sector_fut.await?; + let sector; + let sector_metadata; + let plotted_sector; + + (sector, sector_metadata, table_generator, plotted_sector) = { + let mut sector = vec![0; sector_size]; + let mut sector_metadata = vec![0; sector_metadata_size]; + + let piece_getter = piece_getter.clone(); + let kzg = kzg.clone(); + let erasure_coding = erasure_coding.clone(); + let handle = Handle::current(); + + if replotting { + replotting_thread_pool.install(move || { + let plot_sector_fut = plot_sector::<_, PosTable>( + &public_key, + sector_index, + &piece_getter, + PieceGetterRetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()), + &farmer_app_info.protocol_info, + &kzg, + &erasure_coding, + pieces_in_sector, + &mut sector, + &mut sector_metadata, + &mut table_generator, + ); + + handle.block_on(plot_sector_fut).map(|plotted_sector| { + (sector, sector_metadata, table_generator, plotted_sector) + }) + })? + } else { + plotting_thread_pool.install(move || { + let plot_sector_fut = plot_sector::<_, PosTable>( + &public_key, + sector_index, + &piece_getter, + PieceGetterRetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()), + &farmer_app_info.protocol_info, + &kzg, + &erasure_coding, + pieces_in_sector, + &mut sector, + &mut sector_metadata, + &mut table_generator, + ); + + handle.block_on(plot_sector_fut).map(|plotted_sector| { + (sector, sector_metadata, table_generator, plotted_sector) + }) + })? + } + }; + plot_file.write_all_at(§or, (sector_index as usize * sector_size) as u64)?; metadata_file.write_all_at( §or_metadata, @@ -235,7 +280,7 @@ where // Inform others that this sector is no longer being modified modifying_sector_index.write().take(); - if maybe_old_plotted_sector.is_some() { + if replotting { info!(%sector_index, "Sector replotted successfully"); } else { info!( From 4895b88ae6b047d30e00313f5ac4604288ad3c0d Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 28 Sep 2023 09:42:13 +0300 Subject: [PATCH 7/7] Fix large enum variant warning --- crates/subspace-farmer/src/bin/subspace-farmer/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index 5b683bb4be..154a174166 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -225,7 +225,7 @@ impl FromStr for DiskFarm { #[clap(about, version)] enum Command { /// Start a farmer, does plotting and farming - Farm(FarmingArgs), + Farm(Box), /// Print information about farm and its content Info { /// One or more farm located at specified path. @@ -292,7 +292,7 @@ async fn main() -> anyhow::Result<()> { info!("Done"); } Command::Farm(farming_args) => { - commands::farm::(farming_args).await?; + commands::farm::(*farming_args).await?; } Command::Info { disk_farms } => { commands::info(disk_farms);