Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Account files remove #26910

Merged
merged 9 commits into from
Aug 20, 2022
84 changes: 69 additions & 15 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ use {
solana_vote_program::vote_state,
std::{
collections::{HashMap, HashSet},
ffi::OsString,
net::SocketAddr,
path::{Path, PathBuf},
sync::{
Expand Down Expand Up @@ -363,6 +364,7 @@ pub struct Validator {
ledger_metric_report_service: LedgerMetricReportService,
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
pub async_runtime: Arc<tokio::runtime::Runtime>,
}

impl Validator {
Expand Down Expand Up @@ -453,20 +455,19 @@ impl Validator {
}
}

let async_runtime = tokio::runtime::Runtime::new().unwrap();

info!("Cleaning accounts paths..");
*start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
let mut start = Measure::start("clean_accounts_paths");
for accounts_path in &config.account_paths {
cleanup_accounts_path(accounts_path);
}
if let Some(ref shrink_paths) = config.account_shrink_paths {
for accounts_path in shrink_paths {
cleanup_accounts_path(accounts_path);
}
}
cleanup_accounts_paths(&async_runtime, &config);
start.stop();
info!("done. {}", start);

// This runtime is wrapped with arc, put into the validator self.
// It will stay during the lifetime of the validator.
let async_runtime = Arc::new(async_runtime);

let exit = Arc::new(AtomicBool::new(false));
{
let exit = exit.clone();
Expand Down Expand Up @@ -1066,6 +1067,7 @@ impl Validator {
ledger_metric_report_service,
accounts_background_service,
accounts_hash_verifier,
async_runtime,
})
}

Expand Down Expand Up @@ -2061,13 +2063,65 @@ fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: boo
online_stake_percentage as u64
}

// Cleanup anything that looks like an accounts append-vec
fn cleanup_accounts_path(account_path: &std::path::Path) {
if let Err(e) = std::fs::remove_dir_all(account_path) {
warn!(
"encountered error removing accounts path: {:?}: {}",
account_path, e
);
/* Rename to-be-deleted path to *_delete, and add it into the del_paths vector
* If the _delete path exists (could happen if the previous process was killed),
* then append {count} to the path, and delete them together. This avoids file
* leaking and accumulation even the asyc deleting tasks were not successfully done
*/
fn move_path_for_async_removal(del_paths: &mut Vec<PathBuf>, path: &std::path::Path) {
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
let mut del_path_str = OsString::from(path);
del_path_str.push("_deleted");
let mut loop_count: i32 = 0;
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest just:

  1. calculate path_delete
  2. remove directory path_delete
  3. rename path to path_delete

This simplifies things. And it keeps this process from growing out of control if we fail at startup quickly somehow.
Step 2 will take 0 time for any normal case.

let mut with_appendix = del_path_str.clone();
if loop_count > 0 {
with_appendix.push(format!("{loop_count}"));
}
let path_delete: PathBuf = PathBuf::from(with_appendix);

if path_delete.exists() {
del_paths.push(path_delete);
loop_count += 1;
} else {
std::fs::rename(&path, &path_delete).unwrap();
del_paths.push(path_delete);
break;
}
}
}

/* Delete directories/files asynchronously to avoid blocking on it.
xiangzhu70 marked this conversation as resolved.
Show resolved Hide resolved
Fist, in sync context, rename the original path into
the top to_be_deleted/ directory, then in the async context
call the tokio async remove_dir_all.
The async rmdir process may not finish if the process is
unexpectly aborted, so the files may be left over undeleted.
causing the disk space resource leak. This function finds all
the leftover files and also delete them.
*/
fn cleanup_accounts_paths(async_runtime: &tokio::runtime::Runtime, config: &ValidatorConfig) {
let mut del_paths: Vec<PathBuf> = Vec::new();
for accounts_path in &config.account_paths {
move_path_for_async_removal(&mut del_paths, accounts_path);
}
if let Some(ref shrink_paths) = config.account_shrink_paths {
for accounts_path in shrink_paths {
move_path_for_async_removal(&mut del_paths, accounts_path);
}
}

// Done moving the direcetories/files in sync context. Now start
// the async task to remove them. Note that there is no waiting to ensure this
// async task is done. So if the process crashes or is aborted,
// some files may be left over. The next process will resume the deleting work
// if it finds any leftover file.

for del_path in del_paths {
async_runtime.spawn(async move {
jeffwashington marked this conversation as resolved.
Show resolved Hide resolved
if let Err(e) = tokio::fs::remove_dir_all(&del_path).await {
warn!("encountered error deleting path: {:?}: {}", del_path, e);
}
});
}
}

Expand Down