diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index bf0427ecd2..815c0c8472 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -30,6 +30,7 @@ use parking_lot::Mutex; use rand::prelude::*; use rand_chacha::ChaCha8Rng; use rayon::prelude::*; +use rayon::ThreadPoolBuilder; use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO}; use sc_utils::mpsc::tracing_unbounded; @@ -49,6 +50,9 @@ use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::objects::BlockObjectMapping; use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex}; +/// This corresponds to default value of `--max-runtime-instances` in Substrate +const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 8; + #[derive(Debug)] struct SegmentHeadersStoreInner { aux_store: Arc, @@ -429,44 +433,60 @@ where blocks_to_archive_to, ); - let block_hashes_to_archive = (blocks_to_archive_from..=blocks_to_archive_to) - .into_par_iter() - .map(|block_number| { - Ok(client - .hash(block_number.into())? - .expect("All blocks since last archived must be present; qed")) - }) - .collect::>>()?; + let thread_pool = ThreadPoolBuilder::new() + .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY) + .build() + .map_err(|error| { + sp_blockchain::Error::Backend(format!( + "Failed to create thread pool for archiver initialization: {error}" + )) + })?; + // We need to limit number of threads to avoid running out of WASM instances + let blocks_to_archive = thread_pool.install(|| { + (blocks_to_archive_from..=blocks_to_archive_to) + .into_par_iter() + .map_init( + || client.runtime_api(), + |runtime_api, block_number| { + let block_hash = client + .hash(block_number.into())? + .expect("All blocks since last archived must be present; qed"); + + let block = client + .block(block_hash)? + .expect("All blocks since last archived must be present; qed") + .block; + + let block_object_mappings = runtime_api + .validated_object_call_hashes(block_hash) + .and_then(|calls| { + client.runtime_api().extract_block_object_mapping( + *block.header().parent_hash(), + block.clone(), + calls, + ) + }) + .unwrap_or_default(); + + Ok((block, block_object_mappings)) + }, + ) + .collect::>>() + })?; - best_archived_block.replace(( - *block_hashes_to_archive.last().expect("Not empty; qed"), - blocks_to_archive_to.into(), - )); + best_archived_block = blocks_to_archive + .last() + .map(|(block, _block_object_mappings)| (block.hash(), *block.header().number())); - for block_hash_to_archive in block_hashes_to_archive { - let block = client - .block(block_hash_to_archive)? - .expect("Older block by number must always exist") - .block; + for (block, block_object_mappings) in blocks_to_archive { let block_number_to_archive = *block.header().number(); - let block_object_mappings = client - .runtime_api() - .validated_object_call_hashes(block_hash_to_archive) - .and_then(|calls| { - client.runtime_api().extract_block_object_mapping( - *block.header().parent_hash(), - block.clone(), - calls, - ) - }) - .unwrap_or_default(); - let encoded_block = if block_number_to_archive.is_zero() { encode_genesis_block(&block) } else { block.encode() }; + debug!( target: "subspace", "Encoded block {} has size of {:.2} kiB",