Skip to content

Commit

Permalink
Parallelize retrieval of blocks from database as well as object mappi…
Browse files Browse the repository at this point in the history
…ng creation
  • Loading branch information
nazar-pc committed Sep 19, 2023
1 parent 1d1030c commit 02a95e0
Showing 1 changed file with 49 additions and 29 deletions.
78 changes: 49 additions & 29 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use parking_lot::Mutex;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
use sc_utils::mpsc::tracing_unbounded;
Expand All @@ -49,6 +50,9 @@ use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::objects::BlockObjectMapping;
use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex};

/// This corresponds to default value of `--max-runtime-instances` in Substrate
const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 8;

#[derive(Debug)]
struct SegmentHeadersStoreInner<AS> {
aux_store: Arc<AS>,
Expand Down Expand Up @@ -429,44 +433,60 @@ where
blocks_to_archive_to,
);

let block_hashes_to_archive = (blocks_to_archive_from..=blocks_to_archive_to)
.into_par_iter()
.map(|block_number| {
Ok(client
.hash(block_number.into())?
.expect("All blocks since last archived must be present; qed"))
})
.collect::<sp_blockchain::Result<Vec<_>>>()?;
let thread_pool = ThreadPoolBuilder::new()
.num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY)
.build()
.map_err(|error| {
sp_blockchain::Error::Backend(format!(
"Failed to create thread pool for archiver initialization: {error}"
))
})?;
// We need to limit number of threads to avoid running out of WASM instances
let blocks_to_archive = thread_pool.install(|| {
(blocks_to_archive_from..=blocks_to_archive_to)
.into_par_iter()
.map_init(
|| client.runtime_api(),
|runtime_api, block_number| {
let block_hash = client
.hash(block_number.into())?
.expect("All blocks since last archived must be present; qed");

let block = client
.block(block_hash)?
.expect("All blocks since last archived must be present; qed")
.block;

let block_object_mappings = runtime_api
.validated_object_call_hashes(block_hash)
.and_then(|calls| {
client.runtime_api().extract_block_object_mapping(
*block.header().parent_hash(),
block.clone(),
calls,
)
})
.unwrap_or_default();

Ok((block, block_object_mappings))
},
)
.collect::<sp_blockchain::Result<Vec<_>>>()
})?;

best_archived_block.replace((
*block_hashes_to_archive.last().expect("Not empty; qed"),
blocks_to_archive_to.into(),
));
best_archived_block = blocks_to_archive
.last()
.map(|(block, _block_object_mappings)| (block.hash(), *block.header().number()));

for block_hash_to_archive in block_hashes_to_archive {
let block = client
.block(block_hash_to_archive)?
.expect("Older block by number must always exist")
.block;
for (block, block_object_mappings) in blocks_to_archive {
let block_number_to_archive = *block.header().number();

let block_object_mappings = client
.runtime_api()
.validated_object_call_hashes(block_hash_to_archive)
.and_then(|calls| {
client.runtime_api().extract_block_object_mapping(
*block.header().parent_hash(),
block.clone(),
calls,
)
})
.unwrap_or_default();

let encoded_block = if block_number_to_archive.is_zero() {
encode_genesis_block(&block)
} else {
block.encode()
};

debug!(
target: "subspace",
"Encoded block {} has size of {:.2} kiB",
Expand Down

0 comments on commit 02a95e0

Please sign in to comment.