Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Construct the initial bank state from the snapshot/ and accounts file… #28745

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3f4ced9
Construct the initial bank state from the snapshot/ and accounts file…
xiangzhu70 Sep 28, 2022
af85cd6
Fix get_slot_and_append_vec_id panic on status_cache files
Nov 16, 2022
4b6c910
Cleaned up
Nov 16, 2022
942a7e6
Resolve conflicts after rebasing
xiangzhu70 Nov 17, 2022
6ccc6cd
Remove TBD
xiangzhu70 Nov 17, 2022
d5eecbd
Fix auto-test errors
xiangzhu70 Nov 17, 2022
621d807
fix errors in cargo stable check --locked --tests --bins --examples
xiangzhu70 Nov 17, 2022
50239e1
Fix errors in cargo nightly clippy -Zunstable-options --all-targets -…
xiangzhu70 Nov 17, 2022
822bdc2
Remove the new builtins_exist assert. No need to change the existing…
xiangzhu70 Nov 17, 2022
75905ec
fix substraction overflow assertion
xiangzhu70 Nov 17, 2022
140aebf
Ignore the hash mismatch when constructing the bank from the files
xiangzhu70 Nov 18, 2022
100964c
Fix concurrent_snapshot_packaging test
xiangzhu70 Nov 19, 2022
dcc4947
fix ignore_mismatch compiler complain
xiangzhu70 Nov 19, 2022
8197c76
Rebase to the latest, resolve the conflicts
xiangzhu70 Nov 21, 2022
2eedd1a
Verified runs with validator on a devserver
xiangzhu70 Dec 9, 2022
f639061
Addressed most of the review concerns
xiangzhu70 Dec 9, 2022
3366cad
remove unnused error code
xiangzhu70 Dec 9, 2022
7fc8b23
fixes after rebasing
xiangzhu70 Dec 10, 2022
1463c12
Fix the snapshot_from_file arg
xiangzhu70 Dec 10, 2022
e98084b
fix auto tests failures
xiangzhu70 Dec 10, 2022
e755ae9
Fix snapshot test test_extra_fields_full_snapshot_archive
xiangzhu70 Dec 10, 2022
29a145d
Fix test_concurrent_snapshot_packaging
xiangzhu70 Dec 11, 2022
85eb058
remove parse_appendvec_filename
xiangzhu70 Dec 13, 2022
c8c5700
Fix compliation errors after rebasing
xiangzhu70 Dec 13, 2022
7078598
Addressed a few review comments
xiangzhu70 Dec 13, 2022
127b06d
Fix undefine VecDeque
xiangzhu70 Dec 13, 2022
40b99dc
Fix test_new_from_file_crafted_data_len failure
xiangzhu70 Dec 14, 2022
17d8ade
Fix build error on e.to_string()
xiangzhu70 Dec 14, 2022
c6be7ae
use move_and_async_delete_path for clearing the bank snapshot accounts/
xiangzhu70 Dec 14, 2022
53788c4
Fix test_new_from_file_crafted_zero_lamport_account
xiangzhu70 Dec 14, 2022
8da0667
Fix the flasky test_concurrent_snapshot_packaging caused by async remove
xiangzhu70 Dec 15, 2022
162bdda
Addressed review comments
xiangzhu70 Dec 15, 2022
15e968a
minor cleanup on a few comments changes
xiangzhu70 Dec 15, 2022
a6d624d
Close to final batch of changes addressing the review comments
xiangzhu70 Dec 16, 2022
84a3558
No need to change the snapshot and accounts path for the ledger-tool …
xiangzhu70 Dec 20, 2022
6033aab
Add archive info into from_file snapshot config
xiangzhu70 Dec 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/src/snapshot_packager_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ mod tests {
archive_format,
},
block_height: slot,
slot_deltas: vec![],
snapshot_links: link_snapshots_dir,
snapshot_storages: vec![storage_entries],
snapshot_version: SnapshotVersion::default(),
Expand Down
93 changes: 7 additions & 86 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ use {
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_package::PendingSnapshotPackage,
snapshot_utils,
snapshot_utils::{self, move_and_async_delete_path},
},
solana_sdk::{
clock::Slot,
Expand Down Expand Up @@ -2002,94 +2002,15 @@ fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: boo
online_stake_percentage as u64
}

/// Delete directories/files asynchronously to avoid blocking on it.
xiangzhu70 marked this conversation as resolved.
Show resolved Hide resolved
/// Fist, in sync context, rename the original path to *_deleted,
/// then spawn a thread to delete the renamed path.
/// If the process is killed and the deleting process is not done,
/// the leftover path will be deleted in the next process life, so
/// there is no file space leaking.
pub fn move_and_async_delete_path(path: impl AsRef<Path> + Copy) {
let mut path_delete = PathBuf::new();
path_delete.push(path);
path_delete.set_file_name(format!(
"{}{}",
path_delete.file_name().unwrap().to_str().unwrap(),
"_to_be_deleted"
));

if path_delete.exists() {
std::fs::remove_dir_all(&path_delete).unwrap();
}

if !path.as_ref().exists() {
return;
}

if let Err(err) = std::fs::rename(path, &path_delete) {
warn!(
"Path renaming failed: {}. Falling back to rm_dir in sync mode",
err.to_string()
);
delete_contents_of_path(path);
return;
}

Builder::new()
.name("solDeletePath".to_string())
.spawn(move || {
std::fs::remove_dir_all(path_delete).unwrap();
})
.unwrap();
}

/// Delete the files and subdirectories in a directory.
/// This is useful if the process does not have permission
/// to delete the top level directory it might be able to
/// delete the contents of that directory.
fn delete_contents_of_path(path: impl AsRef<Path> + Copy) {
if let Ok(dir_entries) = std::fs::read_dir(path) {
for entry in dir_entries.flatten() {
let sub_path = entry.path();
let metadata = match entry.metadata() {
Ok(metadata) => metadata,
Err(err) => {
warn!(
"Failed to get metadata for {}. Error: {}",
sub_path.display(),
err.to_string()
);
break;
}
};
if metadata.is_dir() {
if let Err(err) = std::fs::remove_dir_all(&sub_path) {
warn!(
"Failed to remove sub directory {}. Error: {}",
sub_path.display(),
err.to_string()
);
}
} else if metadata.is_file() {
if let Err(err) = std::fs::remove_file(&sub_path) {
warn!(
"Failed to remove file {}. Error: {}",
sub_path.display(),
err.to_string()
);
}
}
}
} else {
warn!(
"Failed to read the sub paths of {}",
path.as_ref().display()
);
}
}

fn cleanup_accounts_paths(config: &ValidatorConfig) {
for accounts_path in &config.account_paths {
move_and_async_delete_path(accounts_path);
// Let the empty top accounts/ dir exist. It was created at very early
// stage. It should not disappear after clearing its content.
if std::fs::metadata(accounts_path).is_err() {
// not the /mnt/account case
std::fs::create_dir_all(accounts_path).unwrap();
}
}
if let Some(ref shrink_paths) = config.account_shrink_paths {
for accounts_path in shrink_paths {
Expand Down
47 changes: 23 additions & 24 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![allow(clippy::integer_arithmetic)]

use {
bincode::serialize_into,
crossbeam_channel::unbounded,
fs_extra::dir::CopyOptions,
itertools::Itertools,
Expand All @@ -16,10 +15,10 @@ use {
AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService,
PrunedBanksRequestHandler, SnapshotRequestHandler,
},
accounts_db::{self, ACCOUNTS_DB_CONFIG_FOR_TESTING},
accounts_db::{self, SnapshotStorages, ACCOUNTS_DB_CONFIG_FOR_TESTING},
accounts_hash::AccountsHash,
accounts_index::AccountSecondaryIndexes,
bank::{Bank, BankSlotDelta},
bank::Bank,
bank_forks::BankForks,
epoch_accounts_hash::EpochAccountsHash,
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
Expand Down Expand Up @@ -51,7 +50,7 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
std::{
collections::HashSet,
collections::{HashSet, VecDeque},
fs,
io::{Error, ErrorKind},
path::PathBuf,
Expand Down Expand Up @@ -240,6 +239,7 @@ fn run_bank_forks_snapshot_n<F>(
snapshot_request_receiver,
accounts_package_sender,
};
let mut snapshot_slot_storages: VecDeque<SnapshotStorages> = VecDeque::new();
for slot in 1..=last_slot {
let mut bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot);
f(&mut bank, mint_keypair);
Expand All @@ -251,7 +251,12 @@ fn run_bank_forks_snapshot_n<F>(
// set_root should send a snapshot request
bank_forks.set_root(bank.slot(), &request_sender, None);
bank.update_accounts_hash_for_tests();
snapshot_request_handler.handle_snapshot_requests(false, 0, &mut None);
snapshot_request_handler.handle_snapshot_requests(
false,
0,
&mut None,
&mut snapshot_slot_storages,
);
}
}

Expand All @@ -261,13 +266,11 @@ fn run_bank_forks_snapshot_n<F>(
let bank_snapshots_dir = &snapshot_config.bank_snapshots_dir;
let last_bank_snapshot_info = snapshot_utils::get_highest_bank_snapshot_pre(bank_snapshots_dir)
.expect("no bank snapshots found in path");
let slot_deltas = last_bank.status_cache.read().unwrap().root_slot_deltas();
let accounts_package = AccountsPackage::new_for_snapshot(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
&last_bank,
&last_bank_snapshot_info,
bank_snapshots_dir,
slot_deltas,
&snapshot_config.full_snapshot_archives_dir,
&snapshot_config.incremental_snapshot_archives_dir,
last_bank.get_snapshot_storages(None),
Expand Down Expand Up @@ -366,8 +369,15 @@ fn test_concurrent_snapshot_packaging(
// Take snapshot of zeroth bank
let bank0 = bank_forks.get(0).unwrap();
let storages = bank0.get_snapshot_storages(None);
snapshot_utils::add_bank_snapshot(bank_snapshots_dir, &bank0, &storages, snapshot_version)
.unwrap();
let slot_deltas = bank0.status_cache.read().unwrap().root_slot_deltas();
snapshot_utils::add_bank_snapshot(
bank_snapshots_dir,
&bank0,
&storages,
snapshot_version,
slot_deltas,
)
.unwrap();

// Set up snapshotting channels
let (real_accounts_package_sender, real_accounts_package_receiver) =
Expand Down Expand Up @@ -411,19 +421,20 @@ fn test_concurrent_snapshot_packaging(
};

let snapshot_storages = bank.get_snapshot_storages(None);
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let bank_snapshot_info = snapshot_utils::add_bank_snapshot(
bank_snapshots_dir,
&bank,
&snapshot_storages,
snapshot_config.snapshot_version,
slot_deltas,
)
.unwrap();
let accounts_package = AccountsPackage::new_for_snapshot(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
&bank,
&bank_snapshot_info,
bank_snapshots_dir,
vec![],
full_snapshot_archives_dir,
incremental_snapshot_archives_dir,
snapshot_storages,
Expand Down Expand Up @@ -554,20 +565,6 @@ fn test_concurrent_snapshot_packaging(

// Check the archive we cached the state for earlier was generated correctly

// before we compare, stick an empty status_cache in this dir so that the package comparison works
// This is needed since the status_cache is added by the packager and is not collected from
// the source dir for snapshots
snapshot_utils::serialize_snapshot_data_file(
&saved_snapshots_dir
.path()
.join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILENAME),
|stream| {
serialize_into(stream, &[] as &[BankSlotDelta])?;
Ok(())
},
)
.unwrap();

// files were saved off before we reserialized the bank in the hacked up accounts_hash_verifier stand-in.
solana_runtime::serde_snapshot::reserialize_bank_with_new_accounts_hash(
saved_snapshots_dir.path(),
Expand Down Expand Up @@ -757,6 +754,7 @@ fn test_bank_forks_incremental_snapshot(
};

let mut last_full_snapshot_slot = None;
let mut snapshot_slot_storages: VecDeque<SnapshotStorages> = VecDeque::new();
for slot in 1..=LAST_SLOT {
// Make a new bank and perform some transactions
let bank = {
Expand Down Expand Up @@ -788,6 +786,7 @@ fn test_bank_forks_incremental_snapshot(
false,
0,
&mut last_full_snapshot_slot,
&mut snapshot_slot_storages,
);
}

Expand Down
37 changes: 29 additions & 8 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ use {
},
},
solana_cli_output::{CliAccount, CliAccountNewConfig, OutputFormat},
solana_core::{
system_monitor_service::{SystemMonitorService, SystemMonitorStatsReportConfig},
validator::move_and_async_delete_path,
},
solana_core::system_monitor_service::{SystemMonitorService, SystemMonitorStatsReportConfig},
solana_entry::entry::Entry,
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService,
solana_ledger::{
Expand Down Expand Up @@ -63,8 +60,8 @@ use {
snapshot_hash::StartingSnapshotHashes,
snapshot_minimizer::SnapshotMinimizer,
snapshot_utils::{
self, ArchiveFormat, SnapshotVersion, DEFAULT_ARCHIVE_COMPRESSION,
SUPPORTED_ARCHIVE_COMPRESSION,
self, move_and_async_delete_path, ArchiveFormat, SnapshotFrom, SnapshotVersion,
DEFAULT_ARCHIVE_COMPRESSION, SUPPORTED_ARCHIVE_COMPRESSION,
},
},
solana_sdk::{
Expand Down Expand Up @@ -1028,6 +1025,7 @@ fn load_bank_forks(
snapshot_archive_path: Option<PathBuf>,
incremental_snapshot_archive_path: Option<PathBuf>,
) -> Result<(Arc<RwLock<BankForks>>, Option<StartingSnapshotHashes>), BlockstoreProcessorError> {
let arg_snapshot_from_file = arg_matches.is_present("snapshot_from_file");
let bank_snapshots_dir = blockstore
.ledger_path()
.join(if blockstore.is_primary_access() {
Expand Down Expand Up @@ -1056,12 +1054,23 @@ fn load_bank_forks(
starting_slot = std::cmp::max(full_snapshot_slot, incremental_snapshot_slot);
}

Some(SnapshotConfig {
let archive_config = SnapshotConfig {
full_snapshot_archives_dir,
incremental_snapshot_archives_dir,
bank_snapshots_dir,
..SnapshotConfig::new_load_only()
})
};
if !arg_snapshot_from_file {
Some(archive_config)
} else {
// Even in the from_file case, still get all the archive config info because they are
// needed if the fallback happens.
Some(SnapshotConfig {
snapshot_from: Some(SnapshotFrom::File),
archive_format: ArchiveFormat::None,
..archive_config
})
}
};

if let Some(halt_slot) = process_options.halt_at_slot {
Expand Down Expand Up @@ -1101,6 +1110,11 @@ fn load_bank_forks(
vec![non_primary_accounts_path]
};

// Ensure the accounts paths exist.
for account_path in &account_paths {
std::fs::create_dir_all(account_path).unwrap_or_default();
}

let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
if arg_matches.is_present("geyser_plugin_config") {
let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String)
Expand Down Expand Up @@ -1304,6 +1318,10 @@ fn main() {
.long("no-snapshot")
.takes_value(false)
.help("Do not start from a local snapshot if present");
let snapshot_from_file = Arg::with_name("snapshot_from_file")
.long("snapshot-from-file")
.takes_value(false)
.help("Construct initial bank state from files instead of archives");
let no_bpf_jit_arg = Arg::with_name("no_bpf_jit")
.long("no-bpf-jit")
.takes_value(false)
Expand Down Expand Up @@ -1765,6 +1783,7 @@ fn main() {
SubCommand::with_name("verify")
.about("Verify the ledger")
.arg(&no_snapshot_arg)
.arg(&snapshot_from_file)
.arg(&account_paths_arg)
.arg(&accounts_index_path_arg)
.arg(&halt_at_slot_arg)
Expand Down Expand Up @@ -1805,6 +1824,7 @@ fn main() {
SubCommand::with_name("graph")
.about("Create a Graphviz rendering of the ledger")
.arg(&no_snapshot_arg)
.arg(&snapshot_from_file)
.arg(&account_paths_arg)
.arg(&halt_at_slot_arg)
.arg(&hard_forks_arg)
Expand Down Expand Up @@ -1834,6 +1854,7 @@ fn main() {
SubCommand::with_name("create-snapshot")
.about("Create a new ledger snapshot")
.arg(&no_snapshot_arg)
.arg(&snapshot_from_file)
.arg(&account_paths_arg)
.arg(&skip_rewrites_arg)
.arg(&accounts_db_skip_initial_hash_calc_arg)
Expand Down
Loading