Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster archiver restart #1980

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sc-consensus-subspace/Cargo.toml
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ parking_lot = "0.12.1"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https://github.com/subspace/polkadot-sdk", rev = "20be5f33a3d2b3f4b31a894f9829184b29fba3ef", version = "0.10.0-dev" }
rand = "0.8.5"
rand_chacha = "0.3.1"
rayon = "1.7.0"
schnorrkel = "0.9.1"
sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "20be5f33a3d2b3f4b31a894f9829184b29fba3ef" }
sc-consensus = { version = "0.10.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "20be5f33a3d2b3f4b31a894f9829184b29fba3ef" }
321 changes: 125 additions & 196 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
@@ -25,10 +25,12 @@ use crate::{
};
use codec::{Decode, Encode};
use futures::StreamExt;
use log::{debug, error, info, warn};
use log::{debug, info, warn};
use parking_lot::Mutex;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
use sc_utils::mpsc::tracing_unbounded;
@@ -48,6 +50,9 @@ use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::objects::BlockObjectMapping;
use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex};

/// This corresponds to default value of `--max-runtime-instances` in Substrate
const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 8;

#[derive(Debug)]
struct SegmentHeadersStoreInner<AS> {
aux_store: Arc<AS>,
@@ -78,7 +83,7 @@ where
const INITIAL_CACHE_CAPACITY: usize = 1_000;

/// Create new instance
pub fn new(aux_store: Arc<AS>) -> Result<Self, sp_blockchain::Error> {
pub fn new(aux_store: Arc<AS>) -> sp_blockchain::Result<Self> {
let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
let mut next_key_index = 0;

@@ -121,7 +126,7 @@ where
pub fn add_segment_headers(
&self,
segment_headers: &[SegmentHeader],
) -> Result<(), sp_blockchain::Error> {
) -> sp_blockchain::Result<()> {
let mut maybe_last_segment_index = self.max_segment_index();
let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
for segment_header in segment_headers {
@@ -201,139 +206,63 @@ where
/// https://github.com/paritytech/substrate/discussions/14359
pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: usize = 5;

fn find_last_archived_block<Block, Client>(
fn find_last_archived_block<Block, Client, AS>(
client: &Client,
best_block_hash: Block::Hash,
) -> Option<(SegmentHeader, Block, BlockObjectMapping)>
segment_headers_store: &SegmentHeadersStore<AS>,
) -> sp_blockchain::Result<Option<(SegmentHeader, Block, BlockObjectMapping)>>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block>,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
AS: AuxStore,
{
let mut block_to_check = best_block_hash;
let last_segment_header = 'outer: loop {
let block = client
.block(block_to_check)
.expect("Older blocks should always exist")
.expect("Older blocks should always exist");

for extrinsic in block.block.extrinsics() {
match client
.runtime_api()
.extract_segment_headers(block_to_check, extrinsic)
{
Ok(Some(segment_headers)) => {
break 'outer segment_headers.into_iter().last()?;
}
Ok(None) => {
// Some other extrinsic, ignore
}
Err(error) => {
// TODO: Probably light client, can this even happen?
panic!(
"Failed to make runtime API call during last archived block search: \
{error:?}"
);
}
}
}

let parent_block_hash = *block.block.header().parent_hash();

if parent_block_hash == Block::Hash::default() {
// Genesis block, nothing else to check
return None;
}

block_to_check = parent_block_hash;
};

let last_archived_block_number = last_segment_header.last_archived_block().number;

let last_archived_block = loop {
let block = client
.block(block_to_check)
.expect("Older blocks must always exist")
.expect("Older blocks must always exist")
.block;

if *block.header().number() == last_archived_block_number.into() {
break block;
}

block_to_check = *block.header().parent_hash();
let Some(max_segment_index) = segment_headers_store.max_segment_index() else {
return Ok(None);
};

let last_archived_block_hash = block_to_check;

let block_object_mappings = client
.runtime_api()
.validated_object_call_hashes(last_archived_block_hash)
.and_then(|calls| {
client.runtime_api().extract_block_object_mapping(
*last_archived_block.header().parent_hash(),
last_archived_block.clone(),
calls,
)
})
.unwrap_or_default();
if max_segment_index == SegmentIndex::ZERO {
// Just genesis, nothing else to check
return Ok(None);
}

Some((
last_segment_header,
last_archived_block,
block_object_mappings,
))
}
for segment_header in (SegmentIndex::ZERO..=max_segment_index)
.rev()
.filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
{
let last_archived_block_number = segment_header.last_archived_block().number;
let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else {
// This block number is not in our chain yet (segment headers store may know about more
// blocks in existence than is currently imported)
continue;
};

struct BlockHashesToArchive<Block>
where
Block: BlockT,
{
block_hashes: Vec<Block::Hash>,
best_archived: Option<(Block::Hash, NumberFor<Block>)>,
}
let last_segment_header = segment_header;

fn block_hashes_to_archive<Block, Client>(
client: &Client,
best_block_hash: Block::Hash,
blocks_to_archive_from: NumberFor<Block>,
blocks_to_archive_to: NumberFor<Block>,
) -> BlockHashesToArchive<Block>
where
Block: BlockT,
Client: HeaderBackend<Block>,
{
let block_range = blocks_to_archive_from..=blocks_to_archive_to;
let mut block_hashes = Vec::new();
let mut block_hash_to_check = best_block_hash;
let mut best_archived = None;

loop {
// TODO: `Error` here must be handled instead
let header = client
.header(block_hash_to_check)
.expect("Parent block must exist; qed")
.expect("Parent block must exist; qed");

if block_range.contains(header.number()) {
block_hashes.push(block_hash_to_check);

if best_archived.is_none() {
best_archived.replace((block_hash_to_check, *header.number()));
}
}
let last_archived_block = client
.block(last_archived_block_hash)?
.expect("Last archived block must always be retrievable; qed")
.block;

if *header.number() == blocks_to_archive_from {
break;
}
let block_object_mappings = client
.runtime_api()
.validated_object_call_hashes(last_archived_block_hash)
.and_then(|calls| {
client.runtime_api().extract_block_object_mapping(
*last_archived_block.header().parent_hash(),
last_archived_block.clone(),
calls,
)
})
.unwrap_or_default();

block_hash_to_check = *header.parent_hash();
return Ok(Some((
last_segment_header,
last_archived_block,
block_object_mappings,
)));
}

BlockHashesToArchive {
block_hashes,
best_archived,
}
Ok(None)
}

/// Derive genesis segment on demand, returns `Ok(None)` in case genesis block was already pruned
@@ -414,12 +343,11 @@ where
}

fn initialize_archiver<Block, Client, AS>(
best_block_hash: Block::Hash,
best_block_number: NumberFor<Block>,
segment_headers_store: &SegmentHeadersStore<AS>,
subspace_link: &SubspaceLink<Block>,
client: &Client,
) -> InitializedArchiver<Block>
) -> sp_blockchain::Result<InitializedArchiver<Block>>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
@@ -430,7 +358,7 @@ where
.expect("Must always be able to get chain constants")
.confirmation_depth_k();

let maybe_last_archived_block = find_last_archived_block(client, best_block_hash);
let maybe_last_archived_block = find_last_archived_block(client, segment_headers_store)?;
let have_last_segment_header = maybe_last_archived_block.is_some();
let mut best_archived_block = None;

@@ -505,40 +433,60 @@ where
blocks_to_archive_to,
);

let block_hashes_to_archive = block_hashes_to_archive(
client,
best_block_hash,
blocks_to_archive_from.into(),
blocks_to_archive_to.into(),
);
best_archived_block = block_hashes_to_archive.best_archived;
let block_hashes_to_archive = block_hashes_to_archive.block_hashes;

for block_hash_to_archive in block_hashes_to_archive.into_iter().rev() {
let block = client
.block(block_hash_to_archive)
.expect("Older block by number must always exist")
.expect("Older block by number must always exist")
.block;
let block_number_to_archive = *block.header().number();
let thread_pool = ThreadPoolBuilder::new()
.num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY)
.build()
.map_err(|error| {
sp_blockchain::Error::Backend(format!(
"Failed to create thread pool for archiver initialization: {error}"
))
})?;
// We need to limit number of threads to avoid running out of WASM instances
let blocks_to_archive = thread_pool.install(|| {
(blocks_to_archive_from..=blocks_to_archive_to)
.into_par_iter()
.map_init(
|| client.runtime_api(),
|runtime_api, block_number| {
let block_hash = client
.hash(block_number.into())?
.expect("All blocks since last archived must be present; qed");

let block = client
.block(block_hash)?
.expect("All blocks since last archived must be present; qed")
.block;

let block_object_mappings = runtime_api
.validated_object_call_hashes(block_hash)
.and_then(|calls| {
client.runtime_api().extract_block_object_mapping(
*block.header().parent_hash(),
block.clone(),
calls,
)
})
.unwrap_or_default();

Ok((block, block_object_mappings))
},
)
.collect::<sp_blockchain::Result<Vec<_>>>()
})?;

let block_object_mappings = client
.runtime_api()
.validated_object_call_hashes(block_hash_to_archive)
.and_then(|calls| {
client.runtime_api().extract_block_object_mapping(
*block.header().parent_hash(),
block.clone(),
calls,
)
})
.unwrap_or_default();
best_archived_block = blocks_to_archive
.last()
.map(|(block, _block_object_mappings)| (block.hash(), *block.header().number()));

for (block, block_object_mappings) in blocks_to_archive {
let block_number_to_archive = *block.header().number();

let encoded_block = if block_number_to_archive.is_zero() {
encode_genesis_block(&block)
} else {
block.encode()
};

debug!(
target: "subspace",
"Encoded block {} has size of {:.2} kiB",
@@ -556,11 +504,7 @@ where
older_archived_segments.extend(archived_segments);

if !new_segment_headers.is_empty() {
if let Err(error) =
segment_headers_store.add_segment_headers(&new_segment_headers)
{
panic!("Failed to store segment headers: {error}");
}
segment_headers_store.add_segment_headers(&new_segment_headers)?;
// Set list of expected segment headers for the block where we expect segment
// header extrinsic to be included
subspace_link.segment_headers.lock().put(
@@ -578,13 +522,13 @@ where
}
}

InitializedArchiver {
Ok(InitializedArchiver {
confirmation_depth_k,
archiver,
older_archived_segments,
best_archived_block: best_archived_block
.expect("Must always set if there is no logical error; qed"),
}
})
}

fn finalize_block<Block, Backend, Client>(
@@ -603,7 +547,7 @@ fn finalize_block<Block, Backend, Client>(
}
// We don't have anything useful to do with this result yet, the only source of errors was
// logged already inside
let _result: Result<_, sp_blockchain::Error> = client.lock_import_and_run(|import_op| {
let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| {
// Ideally some handle to a synchronization oracle would be used to avoid unconditionally
// notifying.
client
@@ -637,7 +581,7 @@ pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
client: Arc<Client>,
sync_oracle: SubspaceSyncOracle<SO>,
telemetry: Option<TelemetryHandle>,
) -> impl Future<Output = ()> + Send + 'static
) -> sp_blockchain::Result<impl Future<Output = sp_blockchain::Result<()>> + Send + 'static>
where
Block: BlockT,
Backend: BackendT<Block>,
@@ -655,7 +599,6 @@ where
SO: SyncOracle + Send + Sync + 'static,
{
let client_info = client.info();
let best_block_hash = client_info.best_hash;
let best_block_number = client_info.best_number;

let InitializedArchiver {
@@ -664,12 +607,11 @@ where
older_archived_segments,
best_archived_block: (mut best_archived_block_hash, mut best_archived_block_number),
} = initialize_archiver(
best_block_hash,
best_block_number,
&segment_headers_store,
subspace_link,
client.as_ref(),
);
)?;

let mut block_importing_notification_stream = subspace_link
.block_importing_notification_stream
@@ -678,7 +620,7 @@ where
subspace_link.archived_segment_notification_sender.clone();
let segment_headers = Arc::clone(&subspace_link.segment_headers);

async move {
Ok(async move {
// Farmers may have not received all previous segments, send them now.
for archived_segment in older_archived_segments {
send_archived_segment_notification(
@@ -714,11 +656,9 @@ where
let block = client
.block(
client
.hash(block_number_to_archive)
.expect("Older block by number must always exist")
.hash(block_number_to_archive)?
.expect("Older block by number must always exist"),
)
.expect("Older block by number must always exist")
)?
.expect("Older block by number must always exist")
.block;

@@ -733,19 +673,19 @@ where
);

if parent_block_hash != best_archived_block_hash {
error!(
target: "subspace",
let error = format!(
"Attempt to switch to a different fork beyond archiving depth, \
can't do it: parent block hash {}, best archived block hash {}",
parent_block_hash,
best_archived_block_hash
parent_block_hash, best_archived_block_hash
);
return;
return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
error.into(),
)));
}

best_archived_block_hash = block_hash_to_archive;

let block_object_mappings = match client
let block_object_mappings = client
.runtime_api()
.validated_object_call_hashes(block_hash_to_archive)
.and_then(|calls| {
@@ -754,16 +694,12 @@ where
block.clone(),
calls,
)
}) {
Ok(block_object_mappings) => block_object_mappings,
Err(error) => {
error!(
target: "subspace",
"Failed to retrieve block object mappings: {error}"
);
return;
}
};
})
.map_err(|error| {
sp_blockchain::Error::Application(
format!("Failed to retrieve block object mappings: {error}").into(),
)
})?;

let encoded_block = block.encode();
debug!(
@@ -781,15 +717,7 @@ where
) {
let segment_header = archived_segment.segment_header;

if let Err(error) =
segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))
{
error!(
target: "subspace",
"Failed to store segment headers: {error}"
);
return;
}
segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;

send_archived_segment_notification(
&archived_segment_notification_sender,
@@ -815,8 +743,7 @@ where

if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
let block_hash_to_finalize = client
.hash(block_number_to_finalize.into())
.expect("Block about to be finalized must always exist")
.hash(block_number_to_finalize.into())?
.expect("Block about to be finalized must always exist");
finalize_block(
client.as_ref(),
@@ -827,7 +754,9 @@ where
}
}
}
}

Ok(())
})
}

async fn send_archived_segment_notification(
13 changes: 11 additions & 2 deletions crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -865,11 +865,20 @@ where
client.clone(),
sync_oracle.clone(),
telemetry.as_ref().map(|telemetry| telemetry.handle()),
);
)
.map_err(ServiceError::Client)?;

task_manager
.spawn_essential_handle()
.spawn_essential_blocking("subspace-archiver", None, Box::pin(subspace_archiver));
.spawn_essential_blocking(
"subspace-archiver",
None,
Box::pin(async move {
if let Err(error) = subspace_archiver.await {
error!(%error, "Archiver exited with error");
}
}),
);

if config.enable_subspace_block_relay {
network_wrapper.set(network_service.clone());