Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve archiver performance (part 2) #1346

Merged
merged 3 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::archiver::Segment;
use alloc::vec::Vec;
use core::ops::{Deref, DerefMut};
use parity_scale_codec::{Encode, Output};
#[cfg(feature = "rayon")]
use rayon::prelude::*;
use subspace_core_primitives::crypto::kzg::{Commitment, Kzg};
use subspace_core_primitives::crypto::Scalar;
use subspace_core_primitives::RawRecord;
Expand Down Expand Up @@ -62,49 +64,66 @@ pub(super) fn update_record_commitments(
/// Processor is hidden to not expose unnecessary implementation details (like `Output` trait
/// implementation)
struct IncrementalRecordCommitmentsProcessor<'a> {
/// Processed bytes in the segment so far
processed_bytes: usize,
/// Buffer where current (partial) record is written
raw_record_buffer: Vec<u8>,
/// Number of bytes of recorded history segment for which commitments were already created
skip_bytes: usize,
/// Buffer where new bytes for which commitments need to be created are pushed
buffer: Vec<u8>,
/// Record commitments already created
incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
/// Kzg instance used for commitments creation
kzg: &'a Kzg,
}

impl<'a> Drop for IncrementalRecordCommitmentsProcessor<'a> {
fn drop(&mut self) {
#[cfg(not(feature = "rayon"))]
let raw_records_bytes = self.buffer.chunks_exact(RawRecord::SIZE);
#[cfg(feature = "rayon")]
let raw_records_bytes = self.buffer.par_chunks_exact(RawRecord::SIZE);

let iter = raw_records_bytes
.map(|raw_record_bytes| {
raw_record_bytes
.array_chunks::<{ Scalar::SAFE_BYTES }>()
.map(Scalar::from)
})
.map(|record_chunks| {
let number_of_chunks = record_chunks.len();
let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());

record_chunks.collect_into(&mut scalars);

// Number of scalars for KZG must be a power of two elements
scalars.resize(scalars.capacity(), Scalar::default());

let polynomial = self
.kzg
.poly(&scalars)
.expect("KZG instance must be configured to support this many scalars; qed");
self.kzg
.commit(&polynomial)
.expect("KZG instance must be configured to support this many scalars; qed")
});

#[cfg(not(feature = "rayon"))]
iter.collect_into(&mut self.incremental_record_commitments.state);
// TODO: `collect_into_vec()`, unfortunately, truncates input, which is not what we want
// can be unified when https://github.com/rayon-rs/rayon/issues/1039 is resolved
#[cfg(feature = "rayon")]
self.incremental_record_commitments
.extend(&iter.collect::<Vec<_>>());
}
}

impl<'a> Output for IncrementalRecordCommitmentsProcessor<'a> {
fn write(&mut self, mut bytes: &[u8]) {
// Try to finish last partial record if possible

let record_offset = self.processed_bytes % RawRecord::SIZE;
let bytes_left_in_record = RawRecord::SIZE - record_offset;
if bytes_left_in_record > 0 {
let remaining_record_bytes;
(remaining_record_bytes, bytes) =
bytes.split_at(if bytes.len() >= bytes_left_in_record {
bytes_left_in_record
} else {
bytes.len()
});

self.update_commitment_state(remaining_record_bytes);

if remaining_record_bytes.len() == bytes_left_in_record {
self.create_commitment();
}
if self.skip_bytes >= bytes.len() {
self.skip_bytes -= bytes.len();
} else {
bytes = &bytes[self.skip_bytes..];
self.skip_bytes = 0;
self.buffer.extend_from_slice(bytes);
}

// Continue processing records (full and partial) from remaining data, at this point we have
// processed some number of full records, so can simply chunk the remaining bytes into
// record sizes
bytes.chunks(RawRecord::SIZE).for_each(|record| {
self.update_commitment_state(record);

// Store hashes of full records
if record.len() == RawRecord::SIZE {
self.create_commitment();
}
});
}
}

Expand All @@ -114,64 +133,11 @@ impl<'a> IncrementalRecordCommitmentsProcessor<'a> {
kzg: &'a Kzg,
) -> Self {
Self {
// TODO: Remove `processed_bytes`, `raw_record_buffer` should be sufficient
processed_bytes: 0,
raw_record_buffer: Vec::with_capacity(RawRecord::SIZE),
skip_bytes: incremental_record_commitments.len() * RawRecord::SIZE,
// Default to record size, may grow if necessary
buffer: Vec::with_capacity(RawRecord::SIZE),
incremental_record_commitments,
kzg,
}
}

/// Whether commitment for current record needs to be created
fn should_commit_to_record(&self, record_position: usize) -> bool {
self.incremental_record_commitments
.state
.get(record_position)
.is_none()
}

/// In case commitment is necessary for currently processed record, internal commitment state
/// will be updated with provided bytes.
///
/// NOTE: This method is called with bytes that either cover part of the record or stop at the
/// edge of the record.
fn update_commitment_state(&mut self, bytes: &[u8]) {
if self.should_commit_to_record(self.processed_bytes / RawRecord::SIZE) {
self.raw_record_buffer.extend_from_slice(bytes);
}
self.processed_bytes += bytes.len();
}

/// In case commitment is necessary for currently processed record, internal hashing state will
/// be finalized and commitment will be stored in shared state.
fn create_commitment(&mut self) {
if self.should_commit_to_record(self.processed_bytes / RawRecord::SIZE - 1) {
let scalars = {
let record_chunks = self
.raw_record_buffer
.array_chunks::<{ Scalar::SAFE_BYTES }>();
let number_of_chunks = record_chunks.len();
let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());

record_chunks.map(Scalar::from).collect_into(&mut scalars);

// Number of scalars for KZG must be a power of two elements
scalars.resize(scalars.capacity(), Scalar::default());

scalars
};
self.raw_record_buffer.clear();

let polynomial = self
.kzg
.poly(&scalars)
.expect("KZG instance must be configured to support this many scalars; qed");
let commitment = self
.kzg
.commit(&polynomial)
.expect("KZG instance must be configured to support this many scalars; qed");

self.incremental_record_commitments.state.push(commitment);
}
}
}
93 changes: 52 additions & 41 deletions crates/subspace-archiving/src/piece_reconstructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ extern crate alloc;
use alloc::string::String;
use alloc::vec::Vec;
use core::num::NonZeroUsize;
#[cfg(feature = "rayon")]
use rayon::prelude::*;
use subspace_core_primitives::crypto::kzg::{Commitment, Kzg, Polynomial};
use subspace_core_primitives::crypto::{blake2b_256_254_hash_to_scalar, Scalar};
use subspace_core_primitives::{ArchivedHistorySegment, Piece, RawRecord, RecordedHistorySegment};
use subspace_core_primitives::{ArchivedHistorySegment, Piece, RawRecord};
use subspace_erasure_coding::ErasureCoding;

/// Reconstructor-related instantiation error.
Expand Down Expand Up @@ -143,46 +145,50 @@ impl PiecesReconstructor {
}
}

let mut source_record_commitments =
Vec::with_capacity(RecordedHistorySegment::NUM_RAW_RECORDS);
for (piece, maybe_input_piece) in
reconstructed_pieces.iter_mut().zip(input_pieces).step_by(2)
{
if let Some(input_piece) = maybe_input_piece {
source_record_commitments.push(
let source_record_commitments = {
#[cfg(not(feature = "rayon"))]
let iter = reconstructed_pieces.iter_mut().zip(input_pieces).step_by(2);
#[cfg(feature = "rayon")]
let iter = reconstructed_pieces
.par_iter_mut()
.zip_eq(input_pieces)
.step_by(2);

iter.map(|(piece, maybe_input_piece)| {
if let Some(input_piece) = maybe_input_piece {
Commitment::try_from_bytes(input_piece.commitment())
.map_err(|_error| ReconstructorError::InvalidInputPieceCommitment)?,
);
} else {
let scalars = {
let record_chunks = piece.record().full_scalar_arrays();
let number_of_chunks = record_chunks.len();
let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());

for record_chunk in record_chunks {
scalars.push(
Scalar::try_from(record_chunk)
.map_err(ReconstructorError::DataShardsReconstruction)?,
);
}

// Number of scalars for KZG must be a power of two elements
scalars.resize(scalars.capacity(), Scalar::default());

scalars
};

let polynomial = self
.kzg
.poly(&scalars)
.expect("KZG instance must be configured to support this many scalars; qed");
let commitment = self
.kzg
.commit(&polynomial)
.expect("KZG instance must be configured to support this many scalars; qed");
source_record_commitments.push(commitment);
}
}
.map_err(|_error| ReconstructorError::InvalidInputPieceCommitment)
} else {
let scalars = {
let record_chunks = piece.record().full_scalar_arrays();
let number_of_chunks = record_chunks.len();
let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());

for record_chunk in record_chunks {
scalars.push(
Scalar::try_from(record_chunk)
.map_err(ReconstructorError::DataShardsReconstruction)?,
);
}

// Number of scalars for KZG must be a power of two elements
scalars.resize(scalars.capacity(), Scalar::default());

scalars
};

let polynomial = self.kzg.poly(&scalars).expect(
"KZG instance must be configured to support this many scalars; qed",
);
let commitment = self.kzg.commit(&polynomial).expect(
"KZG instance must be configured to support this many scalars; qed",
);

Ok(commitment)
}
})
.collect::<Result<Vec<_>, _>>()?
};
let record_commitments = self
.erasure_coding
.extend_commitments(&source_record_commitments)
Expand Down Expand Up @@ -220,7 +226,12 @@ impl PiecesReconstructor {
) -> Result<ArchivedHistorySegment, ReconstructorError> {
let (mut pieces, polynomial) = self.reconstruct_shards(segment_pieces)?;

pieces.iter_mut().enumerate().for_each(|(position, piece)| {
#[cfg(not(feature = "rayon"))]
let iter = pieces.iter_mut().enumerate();
#[cfg(feature = "rayon")]
let iter = pieces.par_iter_mut().enumerate();

iter.for_each(|(position, piece)| {
piece.witness_mut().copy_from_slice(
&self
.kzg
Expand Down
Loading