Skip to content

Commit

Permalink
Adds StartingSnapshotStorages to AccountsHashVerifier (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksprumo authored and willhickey committed Mar 5, 2024
1 parent b63a9af commit f94752d
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 53 deletions.
1 change: 1 addition & 0 deletions accounts-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub mod secondary_index;
pub mod shared_buffer_reader;
pub mod sorted_storages;
pub mod stake_rewards;
pub mod starting_snapshot_storages;
pub mod storable_accounts;
pub mod tiered_storage;
pub mod utils;
Expand Down
19 changes: 19 additions & 0 deletions accounts-db/src/starting_snapshot_storages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use {crate::accounts_db::AccountStorageEntry, std::sync::Arc};

/// Snapshot storages that the node loaded from
///
/// This is used to support fastboot. Since fastboot reuses existing storages, we must carefully
/// handle the storages used to load at startup. If we do not handle these storages properly,
/// restarting from the same local state (i.e. bank snapshot) may fail.
#[derive(Debug)]
pub enum StartingSnapshotStorages {
/// Starting from genesis has no storages yet
Genesis,
/// Starting from a snapshot archive always extracts the storages from the archive, so no
/// special handling is necessary to preserve them.
Archive,
/// Starting from local state must preserve the loaded storages. These storages must *not* be
/// recycled or removed prior to taking the next snapshot, otherwise restarting from the same
/// bank snapshot may fail.
Fastboot(Vec<Arc<AccountStorageEntry>>),
}
8 changes: 7 additions & 1 deletion core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
IncrementalAccountsHash,
},
sorted_storages::SortedStorages,
starting_snapshot_storages::StartingSnapshotStorages,
},
solana_measure::measure_us,
solana_runtime::{
Expand Down Expand Up @@ -42,6 +43,7 @@ impl AccountsHashVerifier {
accounts_package_sender: Sender<AccountsPackage>,
accounts_package_receiver: Receiver<AccountsPackage>,
snapshot_package_sender: Option<Sender<SnapshotPackage>>,
starting_snapshot_storages: StartingSnapshotStorages,
exit: Arc<AtomicBool>,
snapshot_config: SnapshotConfig,
) -> Self {
Expand All @@ -54,7 +56,11 @@ impl AccountsHashVerifier {
// To support fastboot, we must ensure the storages used in the latest POST snapshot are
// not recycled nor removed early. Hold an Arc of their AppendVecs to prevent them from
// expiring.
let mut fastboot_storages = None;
let mut fastboot_storages = match starting_snapshot_storages {
StartingSnapshotStorages::Genesis => None,
StartingSnapshotStorages::Archive => None,
StartingSnapshotStorages::Fastboot(storages) => Some(storages),
};
loop {
if exit.load(Ordering::Relaxed) {
break;
Expand Down
43 changes: 26 additions & 17 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use {
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
starting_snapshot_storages::StartingSnapshotStorages,
utils::{move_and_async_delete_path, move_and_async_delete_path_contents},
},
solana_client::connection_cache::{ConnectionCache, Protocol},
Expand Down Expand Up @@ -690,6 +691,7 @@ impl Validator {
completed_slots_receiver,
leader_schedule_cache,
starting_snapshot_hashes,
starting_snapshot_storages,
TransactionHistoryServices {
transaction_status_sender,
transaction_status_service,
Expand Down Expand Up @@ -779,6 +781,7 @@ impl Validator {
accounts_package_sender.clone(),
accounts_package_receiver,
snapshot_package_sender,
starting_snapshot_storages,
exit.clone(),
config.snapshot_config.clone(),
);
Expand Down Expand Up @@ -1767,6 +1770,7 @@ fn load_blockstore(
CompletedSlotsReceiver,
LeaderScheduleCache,
Option<StartingSnapshotHashes>,
StartingSnapshotStorages,
TransactionHistoryServices,
blockstore_processor::ProcessOptions,
BlockstoreRootScan,
Expand Down Expand Up @@ -1856,23 +1860,27 @@ fn load_blockstore(
let entry_notifier_service = entry_notifier
.map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));

let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
bank_forks_utils::load_bank_forks(
&genesis_config,
&blockstore,
config.account_paths.clone(),
Some(&config.snapshot_config),
&process_options,
transaction_history_services
.cache_block_meta_sender
.as_ref(),
entry_notifier_service
.as_ref()
.map(|service| service.sender()),
accounts_update_notifier,
exit,
)
.map_err(|err| err.to_string())?;
let (
bank_forks,
mut leader_schedule_cache,
starting_snapshot_hashes,
starting_snapshot_storages,
) = bank_forks_utils::load_bank_forks(
&genesis_config,
&blockstore,
config.account_paths.clone(),
Some(&config.snapshot_config),
&process_options,
transaction_history_services
.cache_block_meta_sender
.as_ref(),
entry_notifier_service
.as_ref()
.map(|service| service.sender()),
accounts_update_notifier,
exit,
)
.map_err(|err| err.to_string())?;

// Before replay starts, set the callbacks in each of the banks in BankForks so that
// all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
Expand All @@ -1898,6 +1906,7 @@ fn load_blockstore(
completed_slots_receiver,
leader_schedule_cache,
starting_snapshot_hashes,
starting_snapshot_storages,
transaction_history_services,
process_options,
blockstore_root_scan,
Expand Down
2 changes: 2 additions & 0 deletions core/tests/epoch_accounts_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
accounts_hash::CalcAccountsHashConfig,
accounts_index::AccountSecondaryIndexes,
epoch_accounts_hash::EpochAccountsHash,
starting_snapshot_storages::StartingSnapshotStorages,
},
solana_core::{
accounts_hash_verifier::AccountsHashVerifier,
Expand Down Expand Up @@ -196,6 +197,7 @@ impl BackgroundServices {
accounts_package_sender.clone(),
accounts_package_receiver,
Some(snapshot_package_sender),
StartingSnapshotStorages::Genesis,
exit.clone(),
snapshot_config.clone(),
);
Expand Down
2 changes: 2 additions & 0 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
accounts_hash::AccountsHash,
accounts_index::AccountSecondaryIndexes,
epoch_accounts_hash::EpochAccountsHash,
starting_snapshot_storages::StartingSnapshotStorages,
},
solana_core::{
accounts_hash_verifier::AccountsHashVerifier,
Expand Down Expand Up @@ -1043,6 +1044,7 @@ fn test_snapshots_with_background_services(
accounts_package_sender,
accounts_package_receiver,
Some(snapshot_package_sender),
StartingSnapshotStorages::Genesis,
exit.clone(),
snapshot_test_config.snapshot_config.clone(),
);
Expand Down
32 changes: 19 additions & 13 deletions ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,24 @@ pub fn load_and_process_ledger(
};

let exit = Arc::new(AtomicBool::new(false));
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) =
bank_forks_utils::load_bank_forks(
genesis_config,
blockstore.as_ref(),
account_paths,
snapshot_config.as_ref(),
&process_options,
None,
None, // Maybe support this later, though
accounts_update_notifier,
exit.clone(),
)
.map_err(LoadAndProcessLedgerError::LoadBankForks)?;
let (
bank_forks,
leader_schedule_cache,
starting_snapshot_hashes,
starting_snapshot_storages,
..,
) = bank_forks_utils::load_bank_forks(
genesis_config,
blockstore.as_ref(),
account_paths,
snapshot_config.as_ref(),
&process_options,
None,
None, // Maybe support this later, though
accounts_update_notifier,
exit.clone(),
)
.map_err(LoadAndProcessLedgerError::LoadBankForks)?;
let block_verification_method = value_t!(
arg_matches,
"block_verification_method",
Expand Down Expand Up @@ -325,6 +330,7 @@ pub fn load_and_process_ledger(
accounts_package_sender.clone(),
accounts_package_receiver,
None,
starting_snapshot_storages,
exit.clone(),
SnapshotConfig::new_load_only(),
);
Expand Down
81 changes: 59 additions & 22 deletions ledger/src/bank_forks_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use {
use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup},
},
log::*,
solana_accounts_db::accounts_update_notifier_interface::AccountsUpdateNotifier,
solana_accounts_db::{
accounts_update_notifier_interface::AccountsUpdateNotifier,
starting_snapshot_storages::StartingSnapshotStorages,
},
solana_runtime::{
accounts_background_service::AbsRequestSender,
bank_forks::BankForks,
Expand Down Expand Up @@ -67,6 +70,7 @@ pub type LoadResult = result::Result<
Arc<RwLock<BankForks>>,
LeaderScheduleCache,
Option<StartingSnapshotHashes>,
StartingSnapshotStorages,
),
BankForksUtilsError,
>;
Expand All @@ -88,7 +92,13 @@ pub fn load(
accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: Arc<AtomicBool>,
) -> LoadResult {
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) = load_bank_forks(
let (
bank_forks,
leader_schedule_cache,
starting_snapshot_hashes,
starting_snapshot_storages,
..,
) = load_bank_forks(
genesis_config,
blockstore,
account_paths,
Expand All @@ -111,7 +121,12 @@ pub fn load(
)
.map_err(BankForksUtilsError::ProcessBlockstoreFromRoot)?;

Ok((bank_forks, leader_schedule_cache, starting_snapshot_hashes))
Ok((
bank_forks,
leader_schedule_cache,
starting_snapshot_hashes,
starting_snapshot_storages,
))
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -161,7 +176,7 @@ pub fn load_bank_forks(
))
}

let (bank_forks, starting_snapshot_hashes) =
let (bank_forks, starting_snapshot_hashes, starting_snapshot_storages) =
if let Some((full_snapshot_archive_info, incremental_snapshot_archive_info)) =
get_snapshots_to_load(snapshot_config)
{
Expand All @@ -173,17 +188,22 @@ pub fn load_bank_forks(
);
std::fs::create_dir_all(&snapshot_config.bank_snapshots_dir)
.expect("create bank snapshots dir");
let (bank_forks, starting_snapshot_hashes) = bank_forks_from_snapshot(
full_snapshot_archive_info,
incremental_snapshot_archive_info,
genesis_config,
account_paths,
snapshot_config,
process_options,
accounts_update_notifier,
exit,
)?;
(bank_forks, Some(starting_snapshot_hashes))
let (bank_forks, starting_snapshot_hashes, starting_snapshot_storages) =
bank_forks_from_snapshot(
full_snapshot_archive_info,
incremental_snapshot_archive_info,
genesis_config,
account_paths,
snapshot_config,
process_options,
accounts_update_notifier,
exit,
)?;
(
bank_forks,
Some(starting_snapshot_hashes),
starting_snapshot_storages,
)
} else {
info!("Processing ledger from genesis");
let bank_forks = blockstore_processor::process_blockstore_for_bank_0(
Expand All @@ -202,7 +222,7 @@ pub fn load_bank_forks(
.root_bank()
.set_startup_verification_complete();

(bank_forks, None)
(bank_forks, None, StartingSnapshotStorages::Genesis)
};

let mut leader_schedule_cache =
Expand All @@ -218,7 +238,12 @@ pub fn load_bank_forks(
.for_each(|hard_fork_slot| root_bank.register_hard_fork(*hard_fork_slot));
}

Ok((bank_forks, leader_schedule_cache, starting_snapshot_hashes))
Ok((
bank_forks,
leader_schedule_cache,
starting_snapshot_hashes,
starting_snapshot_storages,
))
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -231,7 +256,14 @@ fn bank_forks_from_snapshot(
process_options: &ProcessOptions,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: Arc<AtomicBool>,
) -> Result<(Arc<RwLock<BankForks>>, StartingSnapshotHashes), BankForksUtilsError> {
) -> Result<
(
Arc<RwLock<BankForks>>,
StartingSnapshotHashes,
StartingSnapshotStorages,
),
BankForksUtilsError,
> {
// Fail hard here if snapshot fails to load, don't silently continue
if account_paths.is_empty() {
return Err(BankForksUtilsError::AccountPathsNotPresent);
Expand All @@ -257,7 +289,7 @@ fn bank_forks_from_snapshot(
.unwrap_or(true),
};

let bank = if will_startup_from_snapshot_archives {
let (bank, starting_snapshot_storages) = if will_startup_from_snapshot_archives {
// Given that we are going to boot from an archive, the append vecs held in the snapshot dirs for fast-boot should
// be released. They will be released by the account_background_service anyway. But in the case of the account_paths
// using memory-mounted file system, they are not released early enough to give space for the new append-vecs from
Expand Down Expand Up @@ -292,7 +324,7 @@ fn bank_forks_from_snapshot(
.map(|archive| archive.path().display().to_string())
.unwrap_or("none".to_string()),
})?;
bank
(bank, StartingSnapshotStorages::Archive)
} else {
let bank_snapshot =
latest_bank_snapshot.ok_or_else(|| BankForksUtilsError::NoBankSnapshotDirectory {
Expand Down Expand Up @@ -346,7 +378,8 @@ fn bank_forks_from_snapshot(
// snapshot archive next time, which is safe.
snapshot_utils::purge_all_bank_snapshots(&snapshot_config.bank_snapshots_dir);

bank
let storages = bank.get_snapshot_storages(None);
(bank, StartingSnapshotStorages::Fastboot(storages))
};

let full_snapshot_hash = FullSnapshotHash((
Expand All @@ -365,5 +398,9 @@ fn bank_forks_from_snapshot(
incremental: incremental_snapshot_hash,
};

Ok((BankForks::new_rw_arc(bank), starting_snapshot_hashes))
Ok((
BankForks::new_rw_arc(bank),
starting_snapshot_hashes,
starting_snapshot_storages,
))
}

0 comments on commit f94752d

Please sign in to comment.