Skip to content

Commit

Permalink
Add accounts hard-link files into the bank snapshot directory (#29496)
Browse files Browse the repository at this point in the history
* Add accounts hard-link files into the bank snapshot directory

* Small adjustments and fixes.

* Address some of the review issues

* Fix compilation issues

* Change the latest slot snapshot storage from VecDeque to Option

* IoWithSourceAndFile and expanded comments on accounts

* last_slot_snapshot_storages in return value

* Update comments following the review input

* rename dir_accounts_hard_links to hard_link_path

* Add dir_full_state flag for add_bank_snapshot

* Let appendvec files hardlinking work with multiple accounts paths across multiple partitions

* Fixes for rebasing

* fix tests which generates account_path without adding run/

* rebasing fixes

* fix account path test failures

* fix test test_concurrent_snapshot_packaging

* review comments.  renamed the path setup function

* Addressed most of the review comments

* update with more review comments

* handle error from create_accounts_run_and_snapshot_dirs

* fix rebasing duplicate

* minor accounts_dir path cleanup

* minor cleanup, remove commented code

* misc review comments

* build error fix

* Fix test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_startup

* fix build error on MAX_BANK_SNAPSHOTS_TO_RETAIN

* rebase fix, update hardlink filename

* minor comment spelling fix

* rebasing fixes

* fix rebase issues; with_extension

* comments changes for review

* misc minor review issues

* bank.fill_bank_with_ticks_for_tests

* error handling on appendvec path

* fix use_jit

* minor comments refining

* Remove type AccountStorages

* get_account_path_from_appendvec_path return changed to Option

* removed appendvec_path.to_path_buf in create_accounts_run_and_snapshot_dirs

* add test_get_snapshot_accounts_hardlink_dir

* update last_snapshot_storages comment

* update last_snapshot_storages comment

* symlink map_err

* simplify test_get_snapshot_accounts_hardlink_dir with fake paths

* log last_snapshot_storages at the end of the loop
  • Loading branch information
xiangzhu70 authored Feb 15, 2023
1 parent eede50c commit 4909267
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 32 deletions.
3 changes: 2 additions & 1 deletion core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
fs_extra::dir::CopyOptions,
itertools::Itertools,
log::{info, trace},
snapshot_utils::MAX_BANK_SNAPSHOTS_TO_RETAIN,
solana_core::{
accounts_hash_verifier::AccountsHashVerifier,
snapshot_packager_service::SnapshotPackagerService,
Expand Down Expand Up @@ -495,7 +496,7 @@ fn test_concurrent_snapshot_packaging(

// Purge all the outdated snapshots, including the ones needed to generate the package
// currently sitting in the channel
snapshot_utils::purge_old_bank_snapshots(bank_snapshots_dir);
snapshot_utils::purge_old_bank_snapshots(bank_snapshots_dir, MAX_BANK_SNAPSHOTS_TO_RETAIN);

let mut bank_snapshots = snapshot_utils::get_bank_snapshots_pre(bank_snapshots_dir);
bank_snapshots.sort_unstable();
Expand Down
8 changes: 8 additions & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,15 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
info!(
"Restarting the validator with full snapshot {validator_full_snapshot_slot_at_startup}..."
);

// Stop the test validator
let validator_info = cluster.exit_node(&validator_identity.pubkey());

// To restart, it is not enough to remove the old bank snapshot directories under snapshot/.
// The old hardlinks under <account_path>/snapshot/<slot> should also be removed.
// The purge call covers all of them.
snapshot_utils::purge_old_bank_snapshots(validator_snapshot_test_config.bank_snapshots_dir, 0);

cluster.restart_node(
&validator_identity.pubkey(),
validator_info,
Expand Down
58 changes: 44 additions & 14 deletions runtime/src/accounts_background_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
mod stats;
use {
crate::{
accounts_db::CalcAccountsHashDataSource,
accounts_db::{AccountStorageEntry, CalcAccountsHashDataSource},
accounts_hash::CalcAccountsHashConfig,
bank::{Bank, BankSlotDelta, DropCallback},
bank_forks::BankForks,
Expand All @@ -16,6 +16,7 @@ use {
crossbeam_channel::{Receiver, SendError, Sender},
log::*,
rand::{thread_rng, Rng},
snapshot_utils::MAX_BANK_SNAPSHOTS_TO_RETAIN,
solana_measure::measure::Measure,
solana_sdk::clock::{BankId, Slot},
stats::StatsManager,
Expand Down Expand Up @@ -142,13 +143,14 @@ pub struct SnapshotRequestHandler {
}

impl SnapshotRequestHandler {
// Returns the latest requested snapshot slot, if one exists
// Returns the latest requested snapshot block height and storages
#[allow(clippy::type_complexity)]
pub fn handle_snapshot_requests(
&self,
test_hash_calculation: bool,
non_snapshot_time_us: u128,
last_full_snapshot_slot: &mut Option<Slot>,
) -> Option<Result<u64, SnapshotError>> {
) -> Option<Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError>> {
let (
snapshot_request,
accounts_package_type,
Expand Down Expand Up @@ -265,7 +267,7 @@ impl SnapshotRequestHandler {
last_full_snapshot_slot: &mut Option<Slot>,
snapshot_request: SnapshotRequest,
accounts_package_type: AccountsPackageType,
) -> Result<u64, SnapshotError> {
) -> Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError> {
debug!(
"handling snapshot request: {:?}, {:?}",
snapshot_request, accounts_package_type
Expand Down Expand Up @@ -367,7 +369,7 @@ impl SnapshotRequestHandler {
&self.snapshot_config.bank_snapshots_dir,
&self.snapshot_config.full_snapshot_archives_dir,
&self.snapshot_config.incremental_snapshot_archives_dir,
snapshot_storages,
snapshot_storages.clone(),
self.snapshot_config.archive_format,
self.snapshot_config.snapshot_version,
accounts_hash_for_testing,
Expand All @@ -379,7 +381,7 @@ impl SnapshotRequestHandler {
AccountsPackage::new_for_epoch_accounts_hash(
accounts_package_type,
&snapshot_root_bank,
snapshot_storages,
snapshot_storages.clone(),
accounts_hash_for_testing,
)
}
Expand All @@ -397,7 +399,10 @@ impl SnapshotRequestHandler {

// Cleanup outdated snapshots
let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
snapshot_utils::purge_old_bank_snapshots(&self.snapshot_config.bank_snapshots_dir);
snapshot_utils::purge_old_bank_snapshots(
&self.snapshot_config.bank_snapshots_dir,
MAX_BANK_SNAPSHOTS_TO_RETAIN,
);
purge_old_snapshots_time.stop();
total_time.stop();

Expand All @@ -419,7 +424,7 @@ impl SnapshotRequestHandler {
("total_us", total_time.as_us(), i64),
("non_snapshot_time_us", non_snapshot_time_us, i64),
);
Ok(snapshot_root_bank.block_height())
Ok((snapshot_root_bank.block_height(), snapshot_storages))
}
}

Expand Down Expand Up @@ -501,12 +506,13 @@ pub struct AbsRequestHandlers {

impl AbsRequestHandlers {
// Returns the latest requested snapshot block height, if one exists
#[allow(clippy::type_complexity)]
pub fn handle_snapshot_requests(
&self,
test_hash_calculation: bool,
non_snapshot_time_us: u128,
last_full_snapshot_slot: &mut Option<Slot>,
) -> Option<Result<u64, SnapshotError>> {
) -> Option<Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError>> {
self.snapshot_request_handler.handle_snapshot_requests(
test_hash_calculation,
non_snapshot_time_us,
Expand Down Expand Up @@ -538,6 +544,11 @@ impl AccountsBackgroundService {
.spawn(move || {
let mut stats = StatsManager::new();
let mut last_snapshot_end_time = None;

// To support fastboot, we must ensure the storages used in the latest bank snapshot are
// not recycled nor removed early. Hold an Arc of their AppendVecs to prevent them from
// expiring.
let mut last_snapshot_storages: Option<Vec<Arc<AccountStorageEntry>>> = None;
loop {
if exit.load(Ordering::Relaxed) {
break;
Expand Down Expand Up @@ -586,7 +597,7 @@ impl AccountsBackgroundService {
// snapshot requests. This is because startup verification and snapshot
// request handling can both kick off accounts hash calculations in background
// threads, and these must not happen concurrently.
let snapshot_block_height_option_result = bank
let snapshot_handle_result = bank
.is_startup_verification_complete()
.then(|| {
request_handlers.handle_snapshot_requests(
Expand All @@ -596,7 +607,7 @@ impl AccountsBackgroundService {
)
})
.flatten();
if snapshot_block_height_option_result.is_some() {
if snapshot_handle_result.is_some() {
last_snapshot_end_time = Some(Instant::now());
}

Expand All @@ -606,12 +617,24 @@ impl AccountsBackgroundService {
// slots >= bank.slot()
bank.flush_accounts_cache_if_needed();

if let Some(snapshot_block_height_result) = snapshot_block_height_option_result
{
if let Some(snapshot_handle_result) = snapshot_handle_result {
// Safe, see proof above
if let Ok(snapshot_block_height) = snapshot_block_height_result {

if let Ok((snapshot_block_height, snapshot_storages)) =
snapshot_handle_result
{
assert!(last_cleaned_block_height <= snapshot_block_height);
last_cleaned_block_height = snapshot_block_height;
// Update the option, so the older one is released, causing the release of
// its reference counts of the appendvecs
last_snapshot_storages = Some(snapshot_storages);
debug!(
"Number of snapshot storages kept alive for fastboot: {}",
last_snapshot_storages
.as_ref()
.map(|storages| storages.len())
.unwrap_or(0)
);
} else {
exit.store(true, Ordering::Relaxed);
return;
Expand All @@ -633,8 +656,15 @@ impl AccountsBackgroundService {
stats.record_and_maybe_submit(start_time.elapsed());
sleep(Duration::from_millis(INTERVAL_MS));
}
info!(
"ABS loop done. Number of snapshot storages kept alive for fastboot: {}",
last_snapshot_storages
.map(|storages| storages.len())
.unwrap_or(0)
);
})
.unwrap();

Self { t_background }
}

Expand Down
8 changes: 6 additions & 2 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5487,11 +5487,15 @@ impl AccountsDb {
drop(recycle_stores);
let old_id = ret.append_vec_id();
ret.recycle(slot, self.next_id());
// This info show the appendvec history change history. It helps debugging
// the appendvec data corrupution issues related to recycling.
debug!(
"recycling store: {} {:?} old_id: {}",
"recycling store: old slot {}, old_id: {}, new slot {}, new id{}, path {:?} ",
slot,
old_id,
ret.slot(),
ret.append_vec_id(),
ret.get_path(),
old_id
);
self.stats
.recycle_store_count
Expand Down
3 changes: 1 addition & 2 deletions runtime/src/serde_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
) -> bool {
let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
let mut bank_pre = bank_post.clone();
bank_pre.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
let bank_pre = bank_post.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);

let mut found = false;
{
Expand Down
5 changes: 2 additions & 3 deletions runtime/src/serde_snapshot/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,10 @@ fn test_bank_serialize_style(
let temp_dir = TempDir::new().unwrap();
let slot_dir = temp_dir.path().join(slot.to_string());
let post_path = slot_dir.join(slot.to_string());
let mut pre_path = post_path.clone();
pre_path.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
let pre_path = post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
std::fs::create_dir(&slot_dir).unwrap();
{
let mut f = std::fs::File::create(&pre_path).unwrap();
let mut f = std::fs::File::create(pre_path).unwrap();
f.write_all(&buf).unwrap();
}

Expand Down
Loading

0 comments on commit 4909267

Please sign in to comment.