Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split the bank snapshot construction from file PR. These are the non-essential helper changes #29311

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 7 additions & 86 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ use {
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_package::PendingSnapshotPackage,
snapshot_utils,
snapshot_utils::{self, move_and_async_delete_path},
},
solana_sdk::{
clock::Slot,
Expand Down Expand Up @@ -2002,94 +2002,15 @@ fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: boo
online_stake_percentage as u64
}

/// Delete directories/files asynchronously to avoid blocking on it.
/// Fist, in sync context, rename the original path to *_deleted,
/// then spawn a thread to delete the renamed path.
/// If the process is killed and the deleting process is not done,
/// the leftover path will be deleted in the next process life, so
/// there is no file space leaking.
pub fn move_and_async_delete_path(path: impl AsRef<Path> + Copy) {
let mut path_delete = PathBuf::new();
path_delete.push(path);
path_delete.set_file_name(format!(
"{}{}",
path_delete.file_name().unwrap().to_str().unwrap(),
"_to_be_deleted"
));

if path_delete.exists() {
std::fs::remove_dir_all(&path_delete).unwrap();
}

if !path.as_ref().exists() {
return;
}

if let Err(err) = std::fs::rename(path, &path_delete) {
warn!(
"Path renaming failed: {}. Falling back to rm_dir in sync mode",
err.to_string()
);
delete_contents_of_path(path);
return;
}

Builder::new()
.name("solDeletePath".to_string())
.spawn(move || {
std::fs::remove_dir_all(path_delete).unwrap();
})
.unwrap();
}

/// Delete the files and subdirectories in a directory.
/// This is useful if the process does not have permission
/// to delete the top level directory it might be able to
/// delete the contents of that directory.
fn delete_contents_of_path(path: impl AsRef<Path> + Copy) {
if let Ok(dir_entries) = std::fs::read_dir(path) {
for entry in dir_entries.flatten() {
let sub_path = entry.path();
let metadata = match entry.metadata() {
Ok(metadata) => metadata,
Err(err) => {
warn!(
"Failed to get metadata for {}. Error: {}",
sub_path.display(),
err.to_string()
);
break;
}
};
if metadata.is_dir() {
if let Err(err) = std::fs::remove_dir_all(&sub_path) {
warn!(
"Failed to remove sub directory {}. Error: {}",
sub_path.display(),
err.to_string()
);
}
} else if metadata.is_file() {
if let Err(err) = std::fs::remove_file(&sub_path) {
warn!(
"Failed to remove file {}. Error: {}",
sub_path.display(),
err.to_string()
);
}
}
}
} else {
warn!(
"Failed to read the sub paths of {}",
path.as_ref().display()
);
}
}

fn cleanup_accounts_paths(config: &ValidatorConfig) {
for accounts_path in &config.account_paths {
move_and_async_delete_path(accounts_path);
// Let the empty top accounts/ dir exist. It was created at very early
// stage. It should not disappear after clearing its content.
if std::fs::metadata(accounts_path).is_err() {
// not the /mnt/account case
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
std::fs::create_dir_all(accounts_path).unwrap();
}
}
if let Some(ref shrink_paths) = config.account_shrink_paths {
for accounts_path in shrink_paths {
Expand Down
9 changes: 3 additions & 6 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ use {
},
},
solana_cli_output::{CliAccount, CliAccountNewConfig, OutputFormat},
solana_core::{
system_monitor_service::{SystemMonitorService, SystemMonitorStatsReportConfig},
validator::move_and_async_delete_path,
},
solana_core::system_monitor_service::{SystemMonitorService, SystemMonitorStatsReportConfig},
solana_entry::entry::Entry,
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService,
solana_ledger::{
Expand Down Expand Up @@ -63,8 +60,8 @@ use {
snapshot_hash::StartingSnapshotHashes,
snapshot_minimizer::SnapshotMinimizer,
snapshot_utils::{
self, ArchiveFormat, SnapshotVersion, DEFAULT_ARCHIVE_COMPRESSION,
SUPPORTED_ARCHIVE_COMPRESSION,
self, move_and_async_delete_path, ArchiveFormat, SnapshotVersion,
DEFAULT_ARCHIVE_COMPRESSION, SUPPORTED_ARCHIVE_COMPRESSION,
},
},
solana_sdk::{
Expand Down
6 changes: 4 additions & 2 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5410,10 +5410,12 @@ impl AccountsDb {
let old_id = ret.append_vec_id();
ret.recycle(slot, self.next_id());
debug!(
"recycling store: {} {:?} old_id: {}",
"recycling store: old slot {}, old_id: {}, new slot {}, new id{}, path {:?} ",
slot,
old_id,
ret.slot(),
ret.append_vec_id(),
ret.get_path(),
old_id
);
self.stats
.recycle_store_count
Expand Down
19 changes: 10 additions & 9 deletions runtime/src/append_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,14 +435,15 @@ impl AppendVec {
}

pub fn new_from_file<P: AsRef<Path>>(path: P, current_len: usize) -> io::Result<(Self, usize)> {
let new = Self::new_from_file_unchecked(path, current_len)?;
let new = Self::new_from_file_unchecked(&path, current_len)?;

let (sanitized, num_accounts) = new.sanitize_layout_and_length();
if !sanitized {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"incorrect layout/length/data",
));
let err_msg = format!(
"incorrect layout/length/data in the appendvec at path {}",
path.as_ref().display()
);
return Err(std::io::Error::new(std::io::ErrorKind::Other, err_msg));
}

Ok((new, num_accounts))
Expand Down Expand Up @@ -1149,7 +1150,7 @@ pub mod tests {
}

let result = AppendVec::new_from_file(path, accounts_len);
assert_matches!(result, Err(ref message) if message.to_string() == *"incorrect layout/length/data");
assert_matches!(result, Err(ref message) if message.to_string().starts_with("incorrect layout/length/data"));
}

#[test]
Expand Down Expand Up @@ -1177,7 +1178,7 @@ pub mod tests {
let accounts_len = av.len();
drop(av);
let result = AppendVec::new_from_file(path, accounts_len);
assert_matches!(result, Err(ref message) if message.to_string() == *"incorrect layout/length/data");
assert_matches!(result, Err(ref message) if message.to_string().starts_with("incorrect layout/length/data"));
}

#[test]
Expand All @@ -1203,7 +1204,7 @@ pub mod tests {
let accounts_len = av.len();
drop(av);
let result = AppendVec::new_from_file(path, accounts_len);
assert_matches!(result, Err(ref message) if message.to_string() == *"incorrect layout/length/data");
assert_matches!(result, Err(ref message) if message.to_string().starts_with("incorrect layout/length/data"));
}

#[test]
Expand Down Expand Up @@ -1265,6 +1266,6 @@ pub mod tests {
let accounts_len = av.len();
drop(av);
let result = AppendVec::new_from_file(path, accounts_len);
assert_matches!(result, Err(ref message) if message.to_string() == *"incorrect layout/length/data");
assert_matches!(result, Err(ref message) if message.to_string().starts_with("incorrect layout/length/data"));
}
}
5 changes: 4 additions & 1 deletion runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3361,7 +3361,10 @@ impl Bank {
if native_loader::check_id(account.owner()) {
Some(account)
} else {
// malicious account is pre-occupying at program_id
warn!(
"malicious account {:?} is pre-occupying at program_id {}",
account, program_id
);
self.burn_and_purge_account(program_id, account);
None
}
Expand Down
4 changes: 3 additions & 1 deletion runtime/src/serde_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,9 @@ where
);

let next_append_vec_id = next_append_vec_id.load(Ordering::Acquire);
let max_append_vec_id = next_append_vec_id - 1;
let max_append_vec_id = next_append_vec_id
.checked_sub(1)
.expect("next_append_vec_id should be > 0");
assert!(
max_append_vec_id <= AppendVecId::MAX / 2,
"Storage id {max_append_vec_id} larger than allowed max"
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/snapshot_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct SnapshotConfig {
/// This is the `debug_verify` parameter to use when calling `update_accounts_hash()`
pub accounts_hash_debug_verify: bool,

// Thread niceness adjustment for snapshot packager service
/// Thread niceness adjustment for snapshot packager service
pub packager_thread_niceness_adj: i8,
}

Expand Down
104 changes: 96 additions & 8 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,91 @@ pub enum VerifySlotDeltasError {
BadSlotHistory,
}

/// Delete the files and subdirectories in a directory.
/// This is useful if the process does not have permission
/// to delete the top level directory it might be able to
/// delete the contents of that directory.
fn delete_contents_of_path(path: impl AsRef<Path> + Copy) {
if let Ok(dir_entries) = std::fs::read_dir(path) {
for entry in dir_entries.flatten() {
let sub_path = entry.path();
let metadata = match entry.metadata() {
Ok(metadata) => metadata,
Err(err) => {
warn!(
"Failed to get metadata for {}. Error: {}",
sub_path.display(),
err.to_string()
);
break;
}
};
if metadata.is_dir() {
if let Err(err) = std::fs::remove_dir_all(&sub_path) {
warn!(
"Failed to remove sub directory {}. Error: {}",
sub_path.display(),
err.to_string()
);
}
} else if metadata.is_file() {
if let Err(err) = std::fs::remove_file(&sub_path) {
warn!(
"Failed to remove file {}. Error: {}",
sub_path.display(),
err.to_string()
);
}
}
}
} else {
warn!(
"Failed to read the sub paths of {}",
path.as_ref().display()
);
}
}

/// Delete directories/files asynchronously to avoid blocking on it.
/// Fist, in sync context, rename the original path to *_deleted,
/// then spawn a thread to delete the renamed path.
/// If the process is killed and the deleting process is not done,
/// the leftover path will be deleted in the next process life, so
/// there is no file space leaking.
pub fn move_and_async_delete_path(path: impl AsRef<Path> + Copy) {
let mut path_delete = PathBuf::new();
path_delete.push(path);
path_delete.set_file_name(format!(
"{}{}",
path_delete.file_name().unwrap().to_str().unwrap(),
"_to_be_deleted"
));

if path_delete.exists() {
std::fs::remove_dir_all(&path_delete).unwrap();
}

if !path.as_ref().exists() {
return;
}

if let Err(err) = std::fs::rename(path, &path_delete) {
warn!(
"Path renaming failed: {}. Falling back to rm_dir in sync mode",
err.to_string()
);
delete_contents_of_path(path);
return;
}

Builder::new()
.name("solDeletePath".to_string())
.spawn(move || {
std::fs::remove_dir_all(path_delete).unwrap();
})
.unwrap();
}

/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
/// directory won't be cleaned up. Call this function to clean them up.
pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
Expand All @@ -300,6 +385,14 @@ pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
}
}

pub fn snapshot_write_version_file(version_file: PathBuf, version: SnapshotVersion) -> Result<()> {
let mut f = fs::File::create(version_file)
.map_err(|e| SnapshotError::IoWithSource(e, "create version file"))?;
f.write_all(version.as_str().as_bytes())
.map_err(|e| SnapshotError::IoWithSource(e, "write version file"))?;
Ok(())
}

/// Make a snapshot archive out of the snapshot package
pub fn archive_snapshot_package(
snapshot_package: &SnapshotPackage,
Expand Down Expand Up @@ -376,12 +469,7 @@ pub fn archive_snapshot_package(
}

// Write version file
{
let mut f = fs::File::create(staging_version_file)
.map_err(|e| SnapshotError::IoWithSource(e, "create version file"))?;
f.write_all(snapshot_package.snapshot_version.as_str().as_bytes())
.map_err(|e| SnapshotError::IoWithSource(e, "write version file"))?;
}
snapshot_write_version_file(staging_version_file, snapshot_package.snapshot_version)?;

// Tar the staging directory into the archive at `archive_path`
let archive_path = tar_dir.join(format!(
Expand Down Expand Up @@ -1752,7 +1840,7 @@ fn bank_fields_from_snapshots(
(None, None)
};
info!(
"Loading bank from full snapshot {} and incremental snapshot {:?}",
"Loading bank fields from full snapshot {} and incremental snapshot {:?}",
full_snapshot_root_paths.snapshot_path.display(),
incremental_snapshot_root_paths
.as_ref()
Expand Down Expand Up @@ -1811,7 +1899,7 @@ fn rebuild_bank_from_snapshots(
(None, None)
};
info!(
"Loading bank from full snapshot {} and incremental snapshot {:?}",
"Rebuild bank from full snapshot {} and incremental snapshot {:?}",
full_snapshot_root_paths.snapshot_path.display(),
incremental_snapshot_root_paths
.as_ref()
Expand Down
Loading