Skip to content

Commit

Permalink
misc review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangzhu70 committed Feb 15, 2023
1 parent d078e8b commit 5477edf
Showing 1 changed file with 22 additions and 154 deletions.
176 changes: 22 additions & 154 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ mod snapshot_storage_rebuilder;
pub use archive_format::*;

pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
pub const SNAPSHOT_COMPLETE_STATE_FILENAME: &str = "state_complete";
pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
pub const DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = 25_000;
pub const DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = 100;
Expand Down Expand Up @@ -439,7 +441,11 @@ pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
}
}

pub fn snapshot_write_version_file(version_file: PathBuf, version: SnapshotVersion) -> Result<()> {
/// Write the snapshot version as a file into the bank snapshot directory
pub fn write_snapshot_version_file(
version_file: impl AsRef<Path>,
version: SnapshotVersion,
) -> Result<()> {
let mut f = fs::File::create(version_file)
.map_err(|e| SnapshotError::IoWithSource(e, "create version file"))?;
f.write_all(version.as_str().as_bytes())
Expand Down Expand Up @@ -483,7 +489,7 @@ pub fn archive_snapshot_package(

let staging_accounts_dir = staging_dir.path().join("accounts");
let staging_snapshots_dir = staging_dir.path().join("snapshots");
let staging_version_file = staging_dir.path().join("version");
let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
fs::create_dir_all(&staging_accounts_dir).map_err(|e| {
SnapshotError::IoWithSourceAndFile(e, "create staging path", staging_accounts_dir.clone())
})?;
Expand Down Expand Up @@ -515,8 +521,7 @@ pub fn archive_snapshot_package(
}
}

// Write version file
snapshot_write_version_file(staging_version_file, snapshot_package.snapshot_version)?;
write_snapshot_version_file(staging_version_file, snapshot_package.snapshot_version)?;

// Tar the staging directory into the archive at `archive_path`
let archive_path = tar_dir.join(format!(
Expand All @@ -533,7 +538,10 @@ pub fn archive_snapshot_package(
let mut archive = tar::Builder::new(encoder);
// Serialize the version and snapshots files before accounts so we can quickly determine the version
// and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
archive.append_path_with_name(staging_dir.as_ref().join("version"), "version")?;
archive.append_path_with_name(
staging_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME),
SNAPSHOT_VERSION_FILENAME,
)?;
for dir in ["snapshots", "accounts"] {
archive.append_dir_all(dir, staging_dir.as_ref().join(dir))?;
}
Expand Down Expand Up @@ -1056,11 +1064,11 @@ pub fn add_bank_snapshot(
let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
serialize_status_cache(slot, &slot_deltas, &status_cache_path)?;

let version_path = bank_snapshot_dir.join("version");
snapshot_write_version_file(version_path, SnapshotVersion::default()).unwrap();
let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
write_snapshot_version_file(version_path, SnapshotVersion::default()).unwrap();

// Mark this directory complete so it can be used. Check this flag first before selecting for deserialization.
let state_complete_path = bank_snapshot_dir.join("state_complete");
let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_COMPLETE_STATE_FILENAME);
fs::File::create(state_complete_path)?;

// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
Expand Down Expand Up @@ -1825,108 +1833,19 @@ pub fn get_highest_incremental_snapshot_archive_slot(
/// There is a time window from the slot directory being created, and the content being completely
/// filled. Check the completion to avoid using a highest found slot directory with missing content.
pub fn snapshot_slot_dir_check_complete(path: &Path) -> bool {
let completion_flag_file = path.to_path_buf().join("state_complete");
let completion_flag_file = path.to_path_buf().join(SNAPSHOT_COMPLETE_STATE_FILENAME);
fs::metadata(completion_flag_file).is_ok()
}

pub fn snapshot_slot_dir_check_version(path: &Path) -> bool {
let version_path = path.to_path_buf().join("version");
let version_path = path.to_path_buf().join(SNAPSHOT_VERSION_FILENAME);
if let Ok(content) = fs::read_to_string(version_path) {
content.eq(SnapshotVersion::default().as_str())
} else {
false
}
}

pub(crate) fn parse_snapshot_filename(filename: &str) -> Option<(Slot, BankSnapshotType)> {
lazy_static! {
static ref SNAPSHOT_FILE_REGEX: Regex =
Regex::new(r"^(?P<slot>[0-9]+)\.(?P<type>(pre|post))$").unwrap();
};

SNAPSHOT_FILE_REGEX.captures(filename).map(|cap| {
let slot_str = cap.name("slot").map(|m| m.as_str());
let type_str = cap.name("type").map(|m| m.as_str());
let slot: Slot = slot_str.unwrap().parse::<u64>().unwrap();
let snapshot_type = if type_str.unwrap() == "pre" {
BankSnapshotType::Pre
} else {
BankSnapshotType::Post
};
(slot, snapshot_type)
})
}

/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
pub fn get_highest_full_snapshot_slot_and_path(
snapshot_dir: impl AsRef<Path>,
) -> Result<(Slot, PathBuf)> {
lazy_static! {
static ref RE_SLOT_DIR: Regex = Regex::new(r"^\d+$").unwrap();
}
// Under snapshot/, find the slot directories (for example, 100/, 200/, 300, ...)
let mut snapshot_slot_dirs: Vec<_> = std::fs::read_dir(&snapshot_dir)
.unwrap()
.filter(|r| r.is_ok())
.map(|r| r.unwrap().path())
.filter(|r| {
r.is_dir()
&& RE_SLOT_DIR.is_match(r.file_name().unwrap().to_str().unwrap())
&& snapshot_slot_dir_check_complete(r)
&& snapshot_slot_dir_check_version(r)
})
.map(|r| r.file_name().unwrap().to_os_string())
.collect();
if snapshot_slot_dirs.is_empty() {
info!(
"shapshot_dir {} is empty, expecting slots sub directories.",
snapshot_dir.as_ref().display()
);
return Err(SnapshotError::NoSnapshotSlotDir);
}
// Find the highest slot directory
snapshot_slot_dirs.sort();
let highest_dir_name = snapshot_slot_dirs.into_iter().rev().next().unwrap();

let slot: Slot = match highest_dir_name.to_string_lossy().parse::<u64>() {
Ok(slot) => slot,
Err(e) => {
info!(
"Error: {e}. Expect snapshot file directory name as <slot>, found {:?}",
highest_dir_name
);
return Err(SnapshotError::UnknownSnapshotFile);
}
};

let highest_dir_path = snapshot_dir.as_ref().join(highest_dir_name);

// Get the snapshot file from the directory. Could be 600/600 or 600/600.pre
let snapshot_files_in_slot_dir: Vec<_> = std::fs::read_dir(&highest_dir_path)?
.into_iter()
.filter(|r| r.is_ok())
.map(|f| f.unwrap().path())
.filter(|f| {
parse_snapshot_filename(&f.file_name().unwrap().to_os_string().into_string().unwrap())
.is_some()
})
.collect();
if snapshot_files_in_slot_dir.len() != 1 {
// We expect there is only one file here. If the files structure is changed to something unexpected,
// bail out here.
info!(
"highest_dir_path {}, snapshot_files_in_slot_dir: {:?}",
highest_dir_path.display(),
snapshot_files_in_slot_dir
);
error!("The number of snapshot files should be 1 in the slot directory");
return Err(SnapshotError::UnknownSnapshotFile);
}

let snapshot_file = &snapshot_files_in_slot_dir[0];
Ok((slot, snapshot_file.to_path_buf()))
}

/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
pub fn get_highest_full_snapshot_archive_info(
full_snapshot_archives_dir: impl AsRef<Path>,
Expand Down Expand Up @@ -2468,13 +2387,14 @@ pub fn verify_snapshot_archive<P, Q, R>(
fs::remove_dir_all(dst_path).unwrap();
}
std::fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
}

let version_path = snapshot_slot_dir.join("version");
let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
if version_path.is_file() {
std::fs::remove_file(version_path).unwrap();
}

let state_complete_path = snapshot_slot_dir.join("state_complete");
let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_COMPLETE_STATE_FILENAME);
if state_complete_path.is_file() {
std::fs::remove_file(state_complete_path).unwrap();
}
Expand Down Expand Up @@ -2771,10 +2691,7 @@ pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
mod tests {
use {
super::*,
crate::{
accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING, bank_forks::BankForks,
status_cache::Status,
},
crate::{accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING, status_cache::Status},
assert_matches::assert_matches,
bincode::{deserialize_from, serialize_into},
solana_sdk::{
Expand Down Expand Up @@ -4611,53 +4528,4 @@ mod tests {

assert!(matches!(ret, Err(SnapshotError::InvalidAppendVecPath(_))));
}

fn test_get_highest_full_snapshot_slot_and_path() {
solana_logger::setup();

let genesis_config = GenesisConfig::default();
let bank0 = Bank::new_for_tests(&genesis_config);
let mut bank_forks = BankForks::new(bank0);

let tmp_dir = tempfile::TempDir::new().unwrap();
let bank_snapshots_dir = tmp_dir.path();
let collecter_id = Pubkey::new_unique();
let snapshot_version = SnapshotVersion::default();

for slot in 0..4 {
// prepare the bank
let parent_bank = bank_forks.get(slot).unwrap();
let bank = Bank::new_from_parent(&parent_bank, &collecter_id, slot + 1);
bank.fill_bank_with_ticks_for_tests();
bank.squash();
bank.force_flush_accounts_cache();

// generate the bank snapshot directory for slot+1
let snapshot_storages = bank.get_snapshot_storages(None);
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
add_bank_snapshot(
bank_snapshots_dir,
&bank,
&snapshot_storages,
snapshot_version,
slot_deltas,
)
.unwrap();

bank_forks.insert(bank);
}

let (slot, _path) = get_highest_full_snapshot_slot_and_path(bank_snapshots_dir).unwrap();

assert_eq!(slot, 4);

let complete_flag_file = bank_snapshots_dir
.join(slot.to_string())
.join("state_complete");
fs::remove_file(complete_flag_file).unwrap();

let (slot, _path) = get_highest_full_snapshot_slot_and_path(bank_snapshots_dir).unwrap();

assert_eq!(slot, 3);
}
}

0 comments on commit 5477edf

Please sign in to comment.