From 5dbe20b4807b1db18d0283d4b79ba9c20a549435 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 19 Sep 2023 02:18:00 +0300 Subject: [PATCH] Leverage `SegmentHeadersStore` to find last archived block faster on node restart, improve error handling by removing incorrect `.expect()` calls --- crates/sc-consensus-subspace/src/archiver.rs | 204 ++++++++----------- crates/subspace-service/src/lib.rs | 13 +- 2 files changed, 91 insertions(+), 126 deletions(-) diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 4e74a6ac37..225c14aca3 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -25,7 +25,7 @@ use crate::{ }; use codec::{Decode, Encode}; use futures::StreamExt; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use parking_lot::Mutex; use rand::prelude::*; use rand_chacha::ChaCha8Rng; @@ -78,7 +78,7 @@ where const INITIAL_CACHE_CAPACITY: usize = 1_000; /// Create new instance - pub fn new(aux_store: Arc) -> Result { + pub fn new(aux_store: Arc) -> sp_blockchain::Result { let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY); let mut next_key_index = 0; @@ -121,7 +121,7 @@ where pub fn add_segment_headers( &self, segment_headers: &[SegmentHeader], - ) -> Result<(), sp_blockchain::Error> { + ) -> sp_blockchain::Result<()> { let mut maybe_last_segment_index = self.max_segment_index(); let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len()); for segment_header in segment_headers { @@ -201,88 +201,63 @@ where /// https://github.com/paritytech/substrate/discussions/14359 pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: usize = 5; -fn find_last_archived_block( +fn find_last_archived_block( client: &Client, - best_block_hash: Block::Hash, -) -> Option<(SegmentHeader, Block, BlockObjectMapping)> + segment_headers_store: &SegmentHeadersStore, +) -> sp_blockchain::Result> where Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + HeaderBackend, Client::Api: SubspaceApi + ObjectsApi, + AS: AuxStore, { - let mut block_to_check = best_block_hash; - let last_segment_header = 'outer: loop { - let block = client - .block(block_to_check) - .expect("Older blocks should always exist") - .expect("Older blocks should always exist"); - - for extrinsic in block.block.extrinsics() { - match client - .runtime_api() - .extract_segment_headers(block_to_check, extrinsic) - { - Ok(Some(segment_headers)) => { - break 'outer segment_headers.into_iter().last()?; - } - Ok(None) => { - // Some other extrinsic, ignore - } - Err(error) => { - // TODO: Probably light client, can this even happen? - panic!( - "Failed to make runtime API call during last archived block search: \ - {error:?}" - ); - } - } - } - - let parent_block_hash = *block.block.header().parent_hash(); - - if parent_block_hash == Block::Hash::default() { - // Genesis block, nothing else to check - return None; - } - - block_to_check = parent_block_hash; + let Some(max_segment_index) = segment_headers_store.max_segment_index() else { + return Ok(None); }; - let last_archived_block_number = last_segment_header.last_archived_block().number; + if max_segment_index == SegmentIndex::ZERO { + // Just genesis, nothing else to check + return Ok(None); + } - let last_archived_block = loop { - let block = client - .block(block_to_check) - .expect("Older blocks must always exist") - .expect("Older blocks must always exist") - .block; + for segment_header in (SegmentIndex::ZERO..=max_segment_index) + .rev() + .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index)) + { + let last_archived_block_number = segment_header.last_archived_block().number; + let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else { + // This block number is not in our chain yet (segment headers store may know about more + // blocks in existence than is currently imported) + continue; + }; - if *block.header().number() == last_archived_block_number.into() { - break block; - } + let last_segment_header = segment_header; - block_to_check = *block.header().parent_hash(); - }; + let last_archived_block = client + .block(last_archived_block_hash)? + .expect("Last archived block must always be retrievable; qed") + .block; - let last_archived_block_hash = block_to_check; + let block_object_mappings = client + .runtime_api() + .validated_object_call_hashes(last_archived_block_hash) + .and_then(|calls| { + client.runtime_api().extract_block_object_mapping( + *last_archived_block.header().parent_hash(), + last_archived_block.clone(), + calls, + ) + }) + .unwrap_or_default(); - let block_object_mappings = client - .runtime_api() - .validated_object_call_hashes(last_archived_block_hash) - .and_then(|calls| { - client.runtime_api().extract_block_object_mapping( - *last_archived_block.header().parent_hash(), - last_archived_block.clone(), - calls, - ) - }) - .unwrap_or_default(); + return Ok(Some(( + last_segment_header, + last_archived_block, + block_object_mappings, + ))); + } - Some(( - last_segment_header, - last_archived_block, - block_object_mappings, - )) + Ok(None) } struct BlockHashesToArchive @@ -298,7 +273,7 @@ fn block_hashes_to_archive( best_block_hash: Block::Hash, blocks_to_archive_from: NumberFor, blocks_to_archive_to: NumberFor, -) -> BlockHashesToArchive +) -> sp_blockchain::Result> where Block: BlockT, Client: HeaderBackend, @@ -311,8 +286,7 @@ where loop { // TODO: `Error` here must be handled instead let header = client - .header(block_hash_to_check) - .expect("Parent block must exist; qed") + .header(block_hash_to_check)? .expect("Parent block must exist; qed"); if block_range.contains(header.number()) { @@ -330,10 +304,10 @@ where block_hash_to_check = *header.parent_hash(); } - BlockHashesToArchive { + Ok(BlockHashesToArchive { block_hashes, best_archived, - } + }) } /// Derive genesis segment on demand, returns `Ok(None)` in case genesis block was already pruned @@ -419,7 +393,7 @@ fn initialize_archiver( segment_headers_store: &SegmentHeadersStore, subspace_link: &SubspaceLink, client: &Client, -) -> InitializedArchiver +) -> sp_blockchain::Result> where Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + HeaderBackend + AuxStore, @@ -430,7 +404,7 @@ where .expect("Must always be able to get chain constants") .confirmation_depth_k(); - let maybe_last_archived_block = find_last_archived_block(client, best_block_hash); + let maybe_last_archived_block = find_last_archived_block(client, segment_headers_store)?; let have_last_segment_header = maybe_last_archived_block.is_some(); let mut best_archived_block = None; @@ -510,14 +484,13 @@ where best_block_hash, blocks_to_archive_from.into(), blocks_to_archive_to.into(), - ); + )?; best_archived_block = block_hashes_to_archive.best_archived; let block_hashes_to_archive = block_hashes_to_archive.block_hashes; for block_hash_to_archive in block_hashes_to_archive.into_iter().rev() { let block = client - .block(block_hash_to_archive) - .expect("Older block by number must always exist") + .block(block_hash_to_archive)? .expect("Older block by number must always exist") .block; let block_number_to_archive = *block.header().number(); @@ -556,11 +529,7 @@ where older_archived_segments.extend(archived_segments); if !new_segment_headers.is_empty() { - if let Err(error) = - segment_headers_store.add_segment_headers(&new_segment_headers) - { - panic!("Failed to store segment headers: {error}"); - } + segment_headers_store.add_segment_headers(&new_segment_headers)?; // Set list of expected segment headers for the block where we expect segment // header extrinsic to be included subspace_link.segment_headers.lock().put( @@ -578,13 +547,13 @@ where } } - InitializedArchiver { + Ok(InitializedArchiver { confirmation_depth_k, archiver, older_archived_segments, best_archived_block: best_archived_block .expect("Must always set if there is no logical error; qed"), - } + }) } fn finalize_block( @@ -603,7 +572,7 @@ fn finalize_block( } // We don't have anything useful to do with this result yet, the only source of errors was // logged already inside - let _result: Result<_, sp_blockchain::Error> = client.lock_import_and_run(|import_op| { + let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| { // Ideally some handle to a synchronization oracle would be used to avoid unconditionally // notifying. client @@ -637,7 +606,7 @@ pub fn create_subspace_archiver( client: Arc, sync_oracle: SubspaceSyncOracle, telemetry: Option, -) -> impl Future + Send + 'static +) -> sp_blockchain::Result> + Send + 'static> where Block: BlockT, Backend: BackendT, @@ -669,7 +638,7 @@ where &segment_headers_store, subspace_link, client.as_ref(), - ); + )?; let mut block_importing_notification_stream = subspace_link .block_importing_notification_stream @@ -678,7 +647,7 @@ where subspace_link.archived_segment_notification_sender.clone(); let segment_headers = Arc::clone(&subspace_link.segment_headers); - async move { + Ok(async move { // Farmers may have not received all previous segments, send them now. for archived_segment in older_archived_segments { send_archived_segment_notification( @@ -714,11 +683,9 @@ where let block = client .block( client - .hash(block_number_to_archive) - .expect("Older block by number must always exist") + .hash(block_number_to_archive)? .expect("Older block by number must always exist"), - ) - .expect("Older block by number must always exist") + )? .expect("Older block by number must always exist") .block; @@ -733,19 +700,19 @@ where ); if parent_block_hash != best_archived_block_hash { - error!( - target: "subspace", + let error = format!( "Attempt to switch to a different fork beyond archiving depth, \ can't do it: parent block hash {}, best archived block hash {}", - parent_block_hash, - best_archived_block_hash + parent_block_hash, best_archived_block_hash ); - return; + return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other( + error.into(), + ))); } best_archived_block_hash = block_hash_to_archive; - let block_object_mappings = match client + let block_object_mappings = client .runtime_api() .validated_object_call_hashes(block_hash_to_archive) .and_then(|calls| { @@ -754,16 +721,12 @@ where block.clone(), calls, ) - }) { - Ok(block_object_mappings) => block_object_mappings, - Err(error) => { - error!( - target: "subspace", - "Failed to retrieve block object mappings: {error}" - ); - return; - } - }; + }) + .map_err(|error| { + sp_blockchain::Error::Application( + format!("Failed to retrieve block object mappings: {error}").into(), + ) + })?; let encoded_block = block.encode(); debug!( @@ -781,15 +744,7 @@ where ) { let segment_header = archived_segment.segment_header; - if let Err(error) = - segment_headers_store.add_segment_headers(slice::from_ref(&segment_header)) - { - error!( - target: "subspace", - "Failed to store segment headers: {error}" - ); - return; - } + segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?; send_archived_segment_notification( &archived_segment_notification_sender, @@ -815,8 +770,7 @@ where if let Some(block_number_to_finalize) = maybe_block_number_to_finalize { let block_hash_to_finalize = client - .hash(block_number_to_finalize.into()) - .expect("Block about to be finalized must always exist") + .hash(block_number_to_finalize.into())? .expect("Block about to be finalized must always exist"); finalize_block( client.as_ref(), @@ -827,7 +781,9 @@ where } } } - } + + Ok(()) + }) } async fn send_archived_segment_notification( diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 819f24832e..a62065b49c 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -698,11 +698,20 @@ where client.clone(), subspace_sync_oracle.clone(), telemetry.as_ref().map(|telemetry| telemetry.handle()), - ); + ) + .map_err(ServiceError::Client)?; task_manager .spawn_essential_handle() - .spawn_essential_blocking("subspace-archiver", None, Box::pin(subspace_archiver)); + .spawn_essential_blocking( + "subspace-archiver", + None, + Box::pin(async move { + if let Err(error) = subspace_archiver.await { + error!(%error, "Archiver exited with error"); + } + }), + ); if config.enable_subspace_block_relay { network_wrapper.set(network_service.clone());