From f209412b781f722ace7eafea7fc2a8e15a146423 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sun, 2 Apr 2023 18:12:03 +0300 Subject: [PATCH 1/3] Make incremental records commitment be able to parallelize commitments creation by using more RAM --- .../incremental_record_commitments.rs | 146 +++++++----------- 1 file changed, 56 insertions(+), 90 deletions(-) diff --git a/crates/subspace-archiving/src/archiver/incremental_record_commitments.rs b/crates/subspace-archiving/src/archiver/incremental_record_commitments.rs index 8ee12bd601..e75ba86589 100644 --- a/crates/subspace-archiving/src/archiver/incremental_record_commitments.rs +++ b/crates/subspace-archiving/src/archiver/incremental_record_commitments.rs @@ -4,6 +4,8 @@ use crate::archiver::Segment; use alloc::vec::Vec; use core::ops::{Deref, DerefMut}; use parity_scale_codec::{Encode, Output}; +#[cfg(feature = "rayon")] +use rayon::prelude::*; use subspace_core_primitives::crypto::kzg::{Commitment, Kzg}; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::RawRecord; @@ -62,49 +64,66 @@ pub(super) fn update_record_commitments( /// Processor is hidden to not expose unnecessary implementation details (like `Output` trait /// implementation) struct IncrementalRecordCommitmentsProcessor<'a> { - /// Processed bytes in the segment so far - processed_bytes: usize, - /// Buffer where current (partial) record is written - raw_record_buffer: Vec, + /// Number of bytes of recorded history segment for which commitments were already created + skip_bytes: usize, + /// Buffer where new bytes for which commitments need to be created are pushed + buffer: Vec, /// Record commitments already created incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState, /// Kzg instance used for commitments creation kzg: &'a Kzg, } +impl<'a> Drop for IncrementalRecordCommitmentsProcessor<'a> { + fn drop(&mut self) { + #[cfg(not(feature = "rayon"))] + let raw_records_bytes = self.buffer.chunks_exact(RawRecord::SIZE); + #[cfg(feature = "rayon")] + let raw_records_bytes = self.buffer.par_chunks_exact(RawRecord::SIZE); + + let iter = raw_records_bytes + .map(|raw_record_bytes| { + raw_record_bytes + .array_chunks::<{ Scalar::SAFE_BYTES }>() + .map(Scalar::from) + }) + .map(|record_chunks| { + let number_of_chunks = record_chunks.len(); + let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two()); + + record_chunks.collect_into(&mut scalars); + + // Number of scalars for KZG must be a power of two elements + scalars.resize(scalars.capacity(), Scalar::default()); + + let polynomial = self + .kzg + .poly(&scalars) + .expect("KZG instance must be configured to support this many scalars; qed"); + self.kzg + .commit(&polynomial) + .expect("KZG instance must be configured to support this many scalars; qed") + }); + + #[cfg(not(feature = "rayon"))] + iter.collect_into(&mut self.incremental_record_commitments.state); + // TODO: `collect_into_vec()`, unfortunately, truncates input, which is not what we want + // can be unified when https://github.com/rayon-rs/rayon/issues/1039 is resolved + #[cfg(feature = "rayon")] + self.incremental_record_commitments + .extend(&iter.collect::>()); + } +} + impl<'a> Output for IncrementalRecordCommitmentsProcessor<'a> { fn write(&mut self, mut bytes: &[u8]) { - // Try to finish last partial record if possible - - let record_offset = self.processed_bytes % RawRecord::SIZE; - let bytes_left_in_record = RawRecord::SIZE - record_offset; - if bytes_left_in_record > 0 { - let remaining_record_bytes; - (remaining_record_bytes, bytes) = - bytes.split_at(if bytes.len() >= bytes_left_in_record { - bytes_left_in_record - } else { - bytes.len() - }); - - self.update_commitment_state(remaining_record_bytes); - - if remaining_record_bytes.len() == bytes_left_in_record { - self.create_commitment(); - } + if self.skip_bytes >= bytes.len() { + self.skip_bytes -= bytes.len(); + } else { + bytes = &bytes[self.skip_bytes..]; + self.skip_bytes = 0; + self.buffer.extend_from_slice(bytes); } - - // Continue processing records (full and partial) from remaining data, at this point we have - // processed some number of full records, so can simply chunk the remaining bytes into - // record sizes - bytes.chunks(RawRecord::SIZE).for_each(|record| { - self.update_commitment_state(record); - - // Store hashes of full records - if record.len() == RawRecord::SIZE { - self.create_commitment(); - } - }); } } @@ -114,64 +133,11 @@ impl<'a> IncrementalRecordCommitmentsProcessor<'a> { kzg: &'a Kzg, ) -> Self { Self { - // TODO: Remove `processed_bytes`, `raw_record_buffer` should be sufficient - processed_bytes: 0, - raw_record_buffer: Vec::with_capacity(RawRecord::SIZE), + skip_bytes: incremental_record_commitments.len() * RawRecord::SIZE, + // Default to record size, may grow if necessary + buffer: Vec::with_capacity(RawRecord::SIZE), incremental_record_commitments, kzg, } } - - /// Whether commitment for current record needs to be created - fn should_commit_to_record(&self, record_position: usize) -> bool { - self.incremental_record_commitments - .state - .get(record_position) - .is_none() - } - - /// In case commitment is necessary for currently processed record, internal commitment state - /// will be updated with provided bytes. - /// - /// NOTE: This method is called with bytes that either cover part of the record or stop at the - /// edge of the record. - fn update_commitment_state(&mut self, bytes: &[u8]) { - if self.should_commit_to_record(self.processed_bytes / RawRecord::SIZE) { - self.raw_record_buffer.extend_from_slice(bytes); - } - self.processed_bytes += bytes.len(); - } - - /// In case commitment is necessary for currently processed record, internal hashing state will - /// be finalized and commitment will be stored in shared state. - fn create_commitment(&mut self) { - if self.should_commit_to_record(self.processed_bytes / RawRecord::SIZE - 1) { - let scalars = { - let record_chunks = self - .raw_record_buffer - .array_chunks::<{ Scalar::SAFE_BYTES }>(); - let number_of_chunks = record_chunks.len(); - let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two()); - - record_chunks.map(Scalar::from).collect_into(&mut scalars); - - // Number of scalars for KZG must be a power of two elements - scalars.resize(scalars.capacity(), Scalar::default()); - - scalars - }; - self.raw_record_buffer.clear(); - - let polynomial = self - .kzg - .poly(&scalars) - .expect("KZG instance must be configured to support this many scalars; qed"); - let commitment = self - .kzg - .commit(&polynomial) - .expect("KZG instance must be configured to support this many scalars; qed"); - - self.incremental_record_commitments.state.push(commitment); - } - } } From be5ee1f32695347b8cbacb9059aeb50807a6c881 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sun, 2 Apr 2023 18:38:29 +0300 Subject: [PATCH 2/3] Parallelize piece reconstruction and some of the tests --- .../src/piece_reconstructor.rs | 93 +++++++++++-------- .../tests/integration/piece_reconstruction.rs | 22 ++++- 2 files changed, 69 insertions(+), 46 deletions(-) diff --git a/crates/subspace-archiving/src/piece_reconstructor.rs b/crates/subspace-archiving/src/piece_reconstructor.rs index f51f3eefce..78de2ae5f4 100644 --- a/crates/subspace-archiving/src/piece_reconstructor.rs +++ b/crates/subspace-archiving/src/piece_reconstructor.rs @@ -3,9 +3,11 @@ extern crate alloc; use alloc::string::String; use alloc::vec::Vec; use core::num::NonZeroUsize; +#[cfg(feature = "rayon")] +use rayon::prelude::*; use subspace_core_primitives::crypto::kzg::{Commitment, Kzg, Polynomial}; use subspace_core_primitives::crypto::{blake2b_256_254_hash_to_scalar, Scalar}; -use subspace_core_primitives::{ArchivedHistorySegment, Piece, RawRecord, RecordedHistorySegment}; +use subspace_core_primitives::{ArchivedHistorySegment, Piece, RawRecord}; use subspace_erasure_coding::ErasureCoding; /// Reconstructor-related instantiation error. @@ -143,46 +145,50 @@ impl PiecesReconstructor { } } - let mut source_record_commitments = - Vec::with_capacity(RecordedHistorySegment::NUM_RAW_RECORDS); - for (piece, maybe_input_piece) in - reconstructed_pieces.iter_mut().zip(input_pieces).step_by(2) - { - if let Some(input_piece) = maybe_input_piece { - source_record_commitments.push( + let source_record_commitments = { + #[cfg(not(feature = "rayon"))] + let iter = reconstructed_pieces.iter_mut().zip(input_pieces).step_by(2); + #[cfg(feature = "rayon")] + let iter = reconstructed_pieces + .par_iter_mut() + .zip_eq(input_pieces) + .step_by(2); + + iter.map(|(piece, maybe_input_piece)| { + if let Some(input_piece) = maybe_input_piece { Commitment::try_from_bytes(input_piece.commitment()) - .map_err(|_error| ReconstructorError::InvalidInputPieceCommitment)?, - ); - } else { - let scalars = { - let record_chunks = piece.record().full_scalar_arrays(); - let number_of_chunks = record_chunks.len(); - let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two()); - - for record_chunk in record_chunks { - scalars.push( - Scalar::try_from(record_chunk) - .map_err(ReconstructorError::DataShardsReconstruction)?, - ); - } - - // Number of scalars for KZG must be a power of two elements - scalars.resize(scalars.capacity(), Scalar::default()); - - scalars - }; - - let polynomial = self - .kzg - .poly(&scalars) - .expect("KZG instance must be configured to support this many scalars; qed"); - let commitment = self - .kzg - .commit(&polynomial) - .expect("KZG instance must be configured to support this many scalars; qed"); - source_record_commitments.push(commitment); - } - } + .map_err(|_error| ReconstructorError::InvalidInputPieceCommitment) + } else { + let scalars = { + let record_chunks = piece.record().full_scalar_arrays(); + let number_of_chunks = record_chunks.len(); + let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two()); + + for record_chunk in record_chunks { + scalars.push( + Scalar::try_from(record_chunk) + .map_err(ReconstructorError::DataShardsReconstruction)?, + ); + } + + // Number of scalars for KZG must be a power of two elements + scalars.resize(scalars.capacity(), Scalar::default()); + + scalars + }; + + let polynomial = self.kzg.poly(&scalars).expect( + "KZG instance must be configured to support this many scalars; qed", + ); + let commitment = self.kzg.commit(&polynomial).expect( + "KZG instance must be configured to support this many scalars; qed", + ); + + Ok(commitment) + } + }) + .collect::, _>>()? + }; let record_commitments = self .erasure_coding .extend_commitments(&source_record_commitments) @@ -220,7 +226,12 @@ impl PiecesReconstructor { ) -> Result { let (mut pieces, polynomial) = self.reconstruct_shards(segment_pieces)?; - pieces.iter_mut().enumerate().for_each(|(position, piece)| { + #[cfg(not(feature = "rayon"))] + let iter = pieces.iter_mut().enumerate(); + #[cfg(feature = "rayon")] + let iter = pieces.par_iter_mut().enumerate(); + + iter.for_each(|(position, piece)| { piece.witness_mut().copy_from_slice( &self .kzg diff --git a/crates/subspace-archiving/tests/integration/piece_reconstruction.rs b/crates/subspace-archiving/tests/integration/piece_reconstruction.rs index 6f38e8e059..f486d84111 100644 --- a/crates/subspace-archiving/tests/integration/piece_reconstruction.rs +++ b/crates/subspace-archiving/tests/integration/piece_reconstruction.rs @@ -1,4 +1,6 @@ use rand::Rng; +#[cfg(feature = "rayon")] +use rayon::prelude::*; use subspace_archiving::archiver::Archiver; use subspace_archiving::piece_reconstructor::{PiecesReconstructor, ReconstructorError}; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; @@ -83,12 +85,22 @@ fn piece_reconstruction_works() { let reconstructor = PiecesReconstructor::new(kzg).unwrap(); - for (missing_piece_position, missing_piece) in missing_pieces { - let recovered_piece = reconstructor - .reconstruct_piece(&maybe_pieces, missing_piece_position) - .unwrap(); + #[cfg(not(feature = "rayon"))] + let iter = missing_pieces.iter(); + #[cfg(feature = "rayon")] + let iter = missing_pieces.par_iter(); + let reconstructed_pieces = iter + .map(|(missing_piece_position, _missing_piece)| { + reconstructor + .reconstruct_piece(&maybe_pieces, *missing_piece_position) + .unwrap() + }) + .collect::>(); - assert_eq!(missing_piece, recovered_piece); + for ((_, missing_piece), reconstructed_piece) in + missing_pieces.iter().zip(&reconstructed_pieces) + { + assert_eq!(missing_piece, reconstructed_piece); } } From c2bbc11ce4f359c55d6abc93c3ff8969594d8262 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sun, 2 Apr 2023 18:50:31 +0300 Subject: [PATCH 3/3] Parallelize archiver tests --- .../tests/integration/archiver.rs | 81 ++++++++++++++----- 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/crates/subspace-archiving/tests/integration/archiver.rs b/crates/subspace-archiving/tests/integration/archiver.rs index 29b78644c8..e35e74ac6f 100644 --- a/crates/subspace-archiving/tests/integration/archiver.rs +++ b/crates/subspace-archiving/tests/integration/archiver.rs @@ -1,5 +1,7 @@ use parity_scale_codec::{Compact, CompactLen, Decode, Encode}; use rand::{thread_rng, Rng}; +#[cfg(feature = "rayon")] +use rayon::prelude::*; use std::assert_matches::assert_matches; use std::io::Write; use std::iter; @@ -181,14 +183,25 @@ fn archiver() { compare_block_objects_to_piece_objects(block_objects, piece_objects); } - // Check that all pieces are valid - for (position, piece) in first_archived_segment.pieces.iter().enumerate() { - assert!(archiver::is_piece_valid( - &kzg, - piece, - &first_archived_segment.segment_header.segment_commitment(), - position as u32, - )); + #[cfg(not(feature = "rayon"))] + let iter = first_archived_segment.pieces.iter().enumerate(); + #[cfg(feature = "rayon")] + let iter = first_archived_segment.pieces.par_iter().enumerate(); + let results = iter + .map(|(position, piece)| { + ( + position, + archiver::is_piece_valid( + &kzg, + piece, + &first_archived_segment.segment_header.segment_commitment(), + position as u32, + ), + ) + }) + .collect::>(); + for (position, valid) in results { + assert!(valid, "Piece at position {position} is valid"); } let block_2 = { @@ -287,13 +300,25 @@ fn archiver() { previous_segment_header_hash ); - for (position, piece) in archived_segment.pieces.iter().enumerate() { - assert!(archiver::is_piece_valid( - &kzg, - piece, - &archived_segment.segment_header.segment_commitment(), - position as u32, - )); + #[cfg(not(feature = "rayon"))] + let iter = archived_segment.pieces.iter().enumerate(); + #[cfg(feature = "rayon")] + let iter = archived_segment.pieces.par_iter().enumerate(); + let results = iter + .map(|(position, piece)| { + ( + position, + archiver::is_piece_valid( + &kzg, + piece, + &archived_segment.segment_header.segment_commitment(), + position as u32, + ), + ) + }) + .collect::>(); + for (position, valid) in results { + assert!(valid, "Piece at position {position} is valid"); } expected_segment_index += SegmentIndex::ONE; @@ -333,13 +358,25 @@ fn archiver() { assert_eq!(last_archived_block.number, 3); assert_eq!(last_archived_block.partial_archived(), None); - for (position, piece) in archived_segment.pieces.iter().enumerate() { - assert!(archiver::is_piece_valid( - &kzg, - piece, - &archived_segment.segment_header.segment_commitment(), - position as u32, - )); + #[cfg(not(feature = "rayon"))] + let iter = archived_segment.pieces.iter().enumerate(); + #[cfg(feature = "rayon")] + let iter = archived_segment.pieces.par_iter().enumerate(); + let results = iter + .map(|(position, piece)| { + ( + position, + archiver::is_piece_valid( + &kzg, + piece, + &archived_segment.segment_header.segment_commitment(), + position as u32, + ), + ) + }) + .collect::>(); + for (position, valid) in results { + assert!(valid, "Piece at position {position} is valid"); } } }