Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Run real snapshot packager while processing blockstore at validator s…
Browse files Browse the repository at this point in the history
…tartup
  • Loading branch information
mvines committed Apr 23, 2022
1 parent b101e00 commit 83e0412
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 384 deletions.
177 changes: 92 additions & 85 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ use {
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;

/// maximum drop bank signal queue length
const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;

pub struct ValidatorConfig {
pub halt_at_slot: Option<Slot>,
pub expected_genesis_hash: Option<Hash>,
Expand Down Expand Up @@ -529,66 +532,18 @@ impl Validator {
Some(poh_timing_point_sender.clone()),
);

let pending_accounts_package = PendingAccountsPackage::default();
let last_full_snapshot_slot = process_blockstore(
&blockstore,
&bank_forks,
&leader_schedule_cache,
&blockstore_process_options,
transaction_status_sender.as_ref(),
cache_block_meta_sender.as_ref(),
config.snapshot_config.as_ref(),
Arc::clone(&pending_accounts_package),
blockstore_root_scan,
pruned_banks_receiver,
&start_progress,
);
let last_full_snapshot_slot =
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));

maybe_warp_slot(config, ledger_path, &bank_forks, &leader_schedule_cache);

let tower = {
let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id);
if let Ok(tower) = &restored_tower {
reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| {
error!("Failed to reconcile blockstore with tower: {:?}", err);
abort()
});
}

post_process_restored_tower(
restored_tower,
&id,
vote_account,
config,
&bank_forks.read().unwrap(),
)
};
info!("Tower state: {:?}", tower);

*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;

let leader_schedule_cache = Arc::new(leader_schedule_cache);

let sample_performance_service =
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
Some(SamplePerformanceService::new(
&bank_forks,
&blockstore,
&exit,
))
} else {
None
};

let bank = bank_forks.read().unwrap().working_bank();
info!("Starting validator with working bank slot {}", bank.slot());

node.info.wallclock = timestamp();
node.info.shred_version = compute_shred_version(
&genesis_config.hash(),
Some(&bank.hard_forks().read().unwrap()),
Some(
&bank_forks
.read()
.unwrap()
.working_bank()
.hard_forks()
.read()
.unwrap(),
),
);

Self::print_node_info(&node);
Expand All @@ -613,28 +568,13 @@ impl Validator {
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
let cluster_info = Arc::new(cluster_info);

// Before replay starts, set the callbacks in each of the banks in BankForks
// Note after this callback is created, only the AccountsBackgroundService should be calling
// AccountsDb::purge_slot() to clean up dropped banks.
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
let callback = bank_forks
.read()
.unwrap()
.root_bank()
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender);
for bank in bank_forks.read().unwrap().banks().values() {
bank.set_callback(Some(Box::new(callback.clone())));
}

let (
accounts_background_service,
accounts_hash_verifier,
snapshot_packager_service,
accounts_background_request_sender,
) = {
let pending_accounts_package = PendingAccountsPackage::default();
let (
accounts_background_request_sender,
snapshot_request_handler,
Expand Down Expand Up @@ -694,6 +634,7 @@ impl Validator {
config.snapshot_config.clone(),
);

let last_full_snapshot_slot = starting_snapshot_hashes.map(|x| x.full.hash.0);
let accounts_background_service = AccountsBackgroundService::new(
bank_forks.clone(),
&exit,
Expand All @@ -714,6 +655,57 @@ impl Validator {
)
};

process_blockstore(
&blockstore,
&bank_forks,
&leader_schedule_cache,
&blockstore_process_options,
transaction_status_sender.as_ref(),
cache_block_meta_sender.as_ref(),
blockstore_root_scan,
&accounts_background_request_sender,
&start_progress,
);

maybe_warp_slot(config, ledger_path, &bank_forks, &leader_schedule_cache);

let tower = {
let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id);
if let Ok(tower) = &restored_tower {
reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| {
error!("Failed to reconcile blockstore with tower: {:?}", err);
abort()
});
}

post_process_restored_tower(
restored_tower,
&id,
vote_account,
config,
&bank_forks.read().unwrap(),
)
};
info!("Tower state: {:?}", tower);

*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;

let leader_schedule_cache = Arc::new(leader_schedule_cache);

let sample_performance_service =
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
Some(SamplePerformanceService::new(
&bank_forks,
&blockstore,
&exit,
))
} else {
None
};

let bank = bank_forks.read().unwrap().working_bank();
info!("Starting validator with working bank slot {}", bank.slot());

let mut block_commitment_cache = BlockCommitmentCache::default();
block_commitment_cache.initialize_slots(bank.slot(), bank_forks.read().unwrap().root());
let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
Expand Down Expand Up @@ -1420,7 +1412,7 @@ fn load_blockstore(
TransactionHistoryServices::default()
};

let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes, pruned_banks_receiver) =
let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
bank_forks_utils::load_bank_forks(
&genesis_config,
&blockstore,
Expand All @@ -1434,6 +1426,28 @@ fn load_blockstore(
accounts_update_notifier,
);

// Before replay starts, set the callbacks in each of the banks in BankForks so that
// all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
// drop behavior can be safely synchronized with any other ongoing accounts activity like
// cache flush, clean, shrink, as long as the same thread performing those activities also
// is processing the dropped banks from the `pruned_banks_receiver` channel.

// There should only be one bank, the root bank in BankForks. Thus all banks added to
// BankForks from now on will be descended from the root bank and thus will inherit
// the bank drop callback.
assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
let (pruned_banks_sender, pruned_banks_receiver) = bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
{
let root_bank = bank_forks.read().unwrap().root_bank();
root_bank.set_callback(Some(Box::new(
root_bank
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender),
)));
}

{
let hard_forks: Vec<_> = bank_forks
.read()
Expand Down Expand Up @@ -1508,12 +1522,10 @@ fn process_blockstore(
process_options: &blockstore_processor::ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
pending_accounts_package: PendingAccountsPackage,
blockstore_root_scan: BlockstoreRootScan,
pruned_banks_receiver: DroppedSlotsReceiver,
accounts_background_request_sender: &AbsRequestSender,
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
) -> Option<Slot> {
) {
let exit = Arc::new(AtomicBool::new(false));
if let Some(max_slot) = highest_slot(blockstore) {
let bank_forks = bank_forks.clone();
Expand All @@ -1529,17 +1541,14 @@ fn process_blockstore(
}
});
}

let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root(
blockstore_processor::process_blockstore_from_root(
blockstore,
bank_forks,
leader_schedule_cache,
process_options,
transaction_status_sender,
cache_block_meta_sender,
snapshot_config,
pending_accounts_package,
pruned_banks_receiver,
accounts_background_request_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
Expand All @@ -1549,8 +1558,6 @@ fn process_blockstore(
exit.store(true, Ordering::Relaxed);

blockstore_root_scan.join();

last_full_snapshot_slot
}

fn maybe_warp_slot(
Expand Down
2 changes: 0 additions & 2 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use {
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_package::PendingAccountsPackage,
snapshot_utils::{
self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
Expand Down Expand Up @@ -778,7 +777,6 @@ fn load_bank_forks(
process_options,
None,
None,
PendingAccountsPackage::default(),
None,
)
.map(|(bank_forks, .., starting_snapshot_hashes)| (bank_forks, starting_snapshot_hashes))
Expand Down
Loading

0 comments on commit 83e0412

Please sign in to comment.