Skip to content

Commit

Permalink
Leverage SegmentHeadersStore to find last archived block faster on …
Browse files Browse the repository at this point in the history
…node restart, improve error handling by removing incorrect `.expect()` calls
  • Loading branch information
nazar-pc committed Sep 18, 2023
1 parent 9077e40 commit 5dbe20b
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 126 deletions.
204 changes: 80 additions & 124 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
};
use codec::{Decode, Encode};
use futures::StreamExt;
use log::{debug, error, info, warn};
use log::{debug, info, warn};
use parking_lot::Mutex;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
Expand Down Expand Up @@ -78,7 +78,7 @@ where
const INITIAL_CACHE_CAPACITY: usize = 1_000;

/// Create new instance
pub fn new(aux_store: Arc<AS>) -> Result<Self, sp_blockchain::Error> {
pub fn new(aux_store: Arc<AS>) -> sp_blockchain::Result<Self> {
let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
let mut next_key_index = 0;

Expand Down Expand Up @@ -121,7 +121,7 @@ where
pub fn add_segment_headers(
&self,
segment_headers: &[SegmentHeader],
) -> Result<(), sp_blockchain::Error> {
) -> sp_blockchain::Result<()> {
let mut maybe_last_segment_index = self.max_segment_index();
let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
for segment_header in segment_headers {
Expand Down Expand Up @@ -201,88 +201,63 @@ where
/// https://github.com/paritytech/substrate/discussions/14359
pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: usize = 5;

fn find_last_archived_block<Block, Client>(
fn find_last_archived_block<Block, Client, AS>(
client: &Client,
best_block_hash: Block::Hash,
) -> Option<(SegmentHeader, Block, BlockObjectMapping)>
segment_headers_store: &SegmentHeadersStore<AS>,
) -> sp_blockchain::Result<Option<(SegmentHeader, Block, BlockObjectMapping)>>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
AS: AuxStore,
{
let mut block_to_check = best_block_hash;
let last_segment_header = 'outer: loop {
let block = client
.block(block_to_check)
.expect("Older blocks should always exist")
.expect("Older blocks should always exist");

for extrinsic in block.block.extrinsics() {
match client
.runtime_api()
.extract_segment_headers(block_to_check, extrinsic)
{
Ok(Some(segment_headers)) => {
break 'outer segment_headers.into_iter().last()?;
}
Ok(None) => {
// Some other extrinsic, ignore
}
Err(error) => {
// TODO: Probably light client, can this even happen?
panic!(
"Failed to make runtime API call during last archived block search: \
{error:?}"
);
}
}
}

let parent_block_hash = *block.block.header().parent_hash();

if parent_block_hash == Block::Hash::default() {
// Genesis block, nothing else to check
return None;
}

block_to_check = parent_block_hash;
let Some(max_segment_index) = segment_headers_store.max_segment_index() else {
return Ok(None);
};

let last_archived_block_number = last_segment_header.last_archived_block().number;
if max_segment_index == SegmentIndex::ZERO {
// Just genesis, nothing else to check
return Ok(None);
}

let last_archived_block = loop {
let block = client
.block(block_to_check)
.expect("Older blocks must always exist")
.expect("Older blocks must always exist")
.block;
for segment_header in (SegmentIndex::ZERO..=max_segment_index)
.rev()
.filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
{
let last_archived_block_number = segment_header.last_archived_block().number;
let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else {
// This block number is not in our chain yet (segment headers store may know about more
// blocks in existence than is currently imported)
continue;
};

if *block.header().number() == last_archived_block_number.into() {
break block;
}
let last_segment_header = segment_header;

block_to_check = *block.header().parent_hash();
};
let last_archived_block = client
.block(last_archived_block_hash)?
.expect("Last archived block must always be retrievable; qed")
.block;

let last_archived_block_hash = block_to_check;
let block_object_mappings = client
.runtime_api()
.validated_object_call_hashes(last_archived_block_hash)
.and_then(|calls| {
client.runtime_api().extract_block_object_mapping(
*last_archived_block.header().parent_hash(),
last_archived_block.clone(),
calls,
)
})
.unwrap_or_default();

let block_object_mappings = client
.runtime_api()
.validated_object_call_hashes(last_archived_block_hash)
.and_then(|calls| {
client.runtime_api().extract_block_object_mapping(
*last_archived_block.header().parent_hash(),
last_archived_block.clone(),
calls,
)
})
.unwrap_or_default();
return Ok(Some((
last_segment_header,
last_archived_block,
block_object_mappings,
)));
}

Some((
last_segment_header,
last_archived_block,
block_object_mappings,
))
Ok(None)
}

struct BlockHashesToArchive<Block>
Expand All @@ -298,7 +273,7 @@ fn block_hashes_to_archive<Block, Client>(
best_block_hash: Block::Hash,
blocks_to_archive_from: NumberFor<Block>,
blocks_to_archive_to: NumberFor<Block>,
) -> BlockHashesToArchive<Block>
) -> sp_blockchain::Result<BlockHashesToArchive<Block>>
where
Block: BlockT,
Client: HeaderBackend<Block>,
Expand All @@ -311,8 +286,7 @@ where
loop {
// TODO: `Error` here must be handled instead
let header = client
.header(block_hash_to_check)
.expect("Parent block must exist; qed")
.header(block_hash_to_check)?
.expect("Parent block must exist; qed");

if block_range.contains(header.number()) {
Expand All @@ -330,10 +304,10 @@ where
block_hash_to_check = *header.parent_hash();
}

BlockHashesToArchive {
Ok(BlockHashesToArchive {
block_hashes,
best_archived,
}
})
}

/// Derive genesis segment on demand, returns `Ok(None)` in case genesis block was already pruned
Expand Down Expand Up @@ -419,7 +393,7 @@ fn initialize_archiver<Block, Client, AS>(
segment_headers_store: &SegmentHeadersStore<AS>,
subspace_link: &SubspaceLink<Block>,
client: &Client,
) -> InitializedArchiver<Block>
) -> sp_blockchain::Result<InitializedArchiver<Block>>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
Expand All @@ -430,7 +404,7 @@ where
.expect("Must always be able to get chain constants")
.confirmation_depth_k();

let maybe_last_archived_block = find_last_archived_block(client, best_block_hash);
let maybe_last_archived_block = find_last_archived_block(client, segment_headers_store)?;
let have_last_segment_header = maybe_last_archived_block.is_some();
let mut best_archived_block = None;

Expand Down Expand Up @@ -510,14 +484,13 @@ where
best_block_hash,
blocks_to_archive_from.into(),
blocks_to_archive_to.into(),
);
)?;
best_archived_block = block_hashes_to_archive.best_archived;
let block_hashes_to_archive = block_hashes_to_archive.block_hashes;

for block_hash_to_archive in block_hashes_to_archive.into_iter().rev() {
let block = client
.block(block_hash_to_archive)
.expect("Older block by number must always exist")
.block(block_hash_to_archive)?
.expect("Older block by number must always exist")
.block;
let block_number_to_archive = *block.header().number();
Expand Down Expand Up @@ -556,11 +529,7 @@ where
older_archived_segments.extend(archived_segments);

if !new_segment_headers.is_empty() {
if let Err(error) =
segment_headers_store.add_segment_headers(&new_segment_headers)
{
panic!("Failed to store segment headers: {error}");
}
segment_headers_store.add_segment_headers(&new_segment_headers)?;
// Set list of expected segment headers for the block where we expect segment
// header extrinsic to be included
subspace_link.segment_headers.lock().put(
Expand All @@ -578,13 +547,13 @@ where
}
}

InitializedArchiver {
Ok(InitializedArchiver {
confirmation_depth_k,
archiver,
older_archived_segments,
best_archived_block: best_archived_block
.expect("Must always set if there is no logical error; qed"),
}
})
}

fn finalize_block<Block, Backend, Client>(
Expand All @@ -603,7 +572,7 @@ fn finalize_block<Block, Backend, Client>(
}
// We don't have anything useful to do with this result yet, the only source of errors was
// logged already inside
let _result: Result<_, sp_blockchain::Error> = client.lock_import_and_run(|import_op| {
let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| {
// Ideally some handle to a synchronization oracle would be used to avoid unconditionally
// notifying.
client
Expand Down Expand Up @@ -637,7 +606,7 @@ pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
client: Arc<Client>,
sync_oracle: SubspaceSyncOracle<SO>,
telemetry: Option<TelemetryHandle>,
) -> impl Future<Output = ()> + Send + 'static
) -> sp_blockchain::Result<impl Future<Output = sp_blockchain::Result<()>> + Send + 'static>
where
Block: BlockT,
Backend: BackendT<Block>,
Expand Down Expand Up @@ -669,7 +638,7 @@ where
&segment_headers_store,
subspace_link,
client.as_ref(),
);
)?;

let mut block_importing_notification_stream = subspace_link
.block_importing_notification_stream
Expand All @@ -678,7 +647,7 @@ where
subspace_link.archived_segment_notification_sender.clone();
let segment_headers = Arc::clone(&subspace_link.segment_headers);

async move {
Ok(async move {
// Farmers may have not received all previous segments, send them now.
for archived_segment in older_archived_segments {
send_archived_segment_notification(
Expand Down Expand Up @@ -714,11 +683,9 @@ where
let block = client
.block(
client
.hash(block_number_to_archive)
.expect("Older block by number must always exist")
.hash(block_number_to_archive)?
.expect("Older block by number must always exist"),
)
.expect("Older block by number must always exist")
)?
.expect("Older block by number must always exist")
.block;

Expand All @@ -733,19 +700,19 @@ where
);

if parent_block_hash != best_archived_block_hash {
error!(
target: "subspace",
let error = format!(
"Attempt to switch to a different fork beyond archiving depth, \
can't do it: parent block hash {}, best archived block hash {}",
parent_block_hash,
best_archived_block_hash
parent_block_hash, best_archived_block_hash
);
return;
return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
error.into(),
)));
}

best_archived_block_hash = block_hash_to_archive;

let block_object_mappings = match client
let block_object_mappings = client
.runtime_api()
.validated_object_call_hashes(block_hash_to_archive)
.and_then(|calls| {
Expand All @@ -754,16 +721,12 @@ where
block.clone(),
calls,
)
}) {
Ok(block_object_mappings) => block_object_mappings,
Err(error) => {
error!(
target: "subspace",
"Failed to retrieve block object mappings: {error}"
);
return;
}
};
})
.map_err(|error| {
sp_blockchain::Error::Application(
format!("Failed to retrieve block object mappings: {error}").into(),
)
})?;

let encoded_block = block.encode();
debug!(
Expand All @@ -781,15 +744,7 @@ where
) {
let segment_header = archived_segment.segment_header;

if let Err(error) =
segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))
{
error!(
target: "subspace",
"Failed to store segment headers: {error}"
);
return;
}
segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;

send_archived_segment_notification(
&archived_segment_notification_sender,
Expand All @@ -815,8 +770,7 @@ where

if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
let block_hash_to_finalize = client
.hash(block_number_to_finalize.into())
.expect("Block about to be finalized must always exist")
.hash(block_number_to_finalize.into())?
.expect("Block about to be finalized must always exist");
finalize_block(
client.as_ref(),
Expand All @@ -827,7 +781,9 @@ where
}
}
}
}

Ok(())
})
}

async fn send_archived_segment_notification(
Expand Down
Loading

0 comments on commit 5dbe20b

Please sign in to comment.