diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index a2ad674b9952ec..6aa5fb8d802f29 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -60,6 +60,7 @@ use { crate::{ accounts_db::{AccountStorageMap, AtomicAppendVecId}, hardened_unpack::streaming_unpack_snapshot, + snapshot_utils::snapshot_storage_rebuilder::RebuiltSnapshotStorage, }, crossbeam_channel::Sender, std::thread::{Builder, JoinHandle}, @@ -130,10 +131,6 @@ impl SnapshotVersion { pub fn as_str(self) -> &'static str { <&str as From>::from(self) } - - fn maybe_from_string(version_string: &str) -> Option { - version_string.parse::().ok() - } } /// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and @@ -199,7 +196,7 @@ struct UnarchivedSnapshot { #[derive(Debug)] struct UnpackedSnapshotsDirAndVersion { unpacked_snapshots_dir: PathBuf, - snapshot_version: String, + snapshot_version: SnapshotVersion, } /// Helper type for passing around account storage map and next append vec id @@ -1245,7 +1242,6 @@ where .prefix(unpacked_snapshots_dir_prefix) .tempdir_in(bank_snapshots_dir)?; let unpacked_snapshots_dir = unpack_dir.path().join("snapshots"); - let unpacked_version_file = unpack_dir.path().join("version"); let (file_sender, file_receiver) = crossbeam_channel::unbounded(); streaming_unarchive_snapshot( @@ -1260,18 +1256,20 @@ where let num_rebuilder_threads = num_cpus::get_physical() .saturating_sub(parallel_divisions) .max(1); - let (storage, measure_untar) = measure!( + let (version_and_storages, measure_untar) = measure!( SnapshotStorageRebuilder::rebuild_storage( file_receiver, num_rebuilder_threads, next_append_vec_id - ), + )?, measure_name ); info!("{}", measure_untar); - let snapshot_version = snapshot_version_from_file(&unpacked_version_file)?; - + let RebuiltSnapshotStorage { + snapshot_version, + storage, + } = version_and_storages; Ok(UnarchivedSnapshot { unpack_dir, storage, @@ -1709,14 +1707,7 @@ fn verify_unpacked_snapshots_dir_and_version( &unpacked_snapshots_dir_and_version.snapshot_version ); - let snapshot_version = - SnapshotVersion::maybe_from_string(&unpacked_snapshots_dir_and_version.snapshot_version) - .ok_or_else(|| { - get_io_error(&format!( - "unsupported snapshot version: {}", - &unpacked_snapshots_dir_and_version.snapshot_version, - )) - })?; + let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version; let mut bank_snapshots = get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir); if bank_snapshots.len() > 1 { diff --git a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs index 8b8b9269002a3b..545b4b2d803177 100644 --- a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs +++ b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs @@ -1,6 +1,7 @@ //! Provides interfaces for rebuilding snapshot storages use { + super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion}, crate::{ accounts_db::{AccountStorageEntry, AccountStorageMap, AppendVecId, AtomicAppendVecId}, serde_snapshot::{ @@ -28,10 +29,17 @@ use { time::Instant, }, }; +/// Convenient wrapper for snapshot version and rebuilt storages +pub(crate) struct RebuiltSnapshotStorage { + /// Snapshot version + pub snapshot_version: SnapshotVersion, + /// Rebuilt storages + pub storage: AccountStorageMap, +} /// Stores state for rebuilding snapshot storages #[derive(Debug)] -pub struct SnapshotStorageRebuilder { +pub(crate) struct SnapshotStorageRebuilder { /// Receiver for unpacked snapshot storage files file_receiver: Receiver, /// Number of threads to rebuild with @@ -52,20 +60,35 @@ pub struct SnapshotStorageRebuilder { impl SnapshotStorageRebuilder { /// Synchronously spawns threads to rebuild snapshot storages - pub fn rebuild_storage( + pub(crate) fn rebuild_storage( file_receiver: Receiver, num_threads: usize, next_append_vec_id: Arc, - ) -> AccountStorageMap { - let (snapshot_file_path, append_vec_files) = Self::get_snapshot_file(&file_receiver); - let snapshot_storage_lengths = Self::process_snapshot_file(snapshot_file_path).unwrap(); - Self::spawn_rebuilder_threads( + ) -> Result { + let (snapshot_version_path, snapshot_file_path, append_vec_files) = + Self::get_version_and_snapshot_files(&file_receiver); + let snapshot_version_str = snapshot_version_from_file(&snapshot_version_path)?; + let snapshot_version = snapshot_version_str.parse().map_err(|_| { + get_io_error(&format!( + "unsupported snapshot version: {}", + snapshot_version_str, + )) + })?; + let snapshot_storage_lengths = + Self::process_snapshot_file(snapshot_version, snapshot_file_path)?; + + let account_storage_map = Self::spawn_rebuilder_threads( file_receiver, num_threads, next_append_vec_id, snapshot_storage_lengths, append_vec_files, - ) + ); + + Ok(RebuiltSnapshotStorage { + snapshot_version, + storage: account_storage_map, + }) } /// Create the SnapshotStorageRebuilder for storing state during rebuilding @@ -98,16 +121,34 @@ impl SnapshotStorageRebuilder { /// Waits for snapshot file /// Due to parallel unpacking, we may receive some append_vec files before the snapshot file /// This function will push append_vec files into a buffer until we receive the snapshot file - fn get_snapshot_file(file_receiver: &Receiver) -> (PathBuf, Vec) { + fn get_version_and_snapshot_files( + file_receiver: &Receiver, + ) -> (PathBuf, PathBuf, Vec) { let mut append_vec_files = Vec::with_capacity(1024); - let snapshot_file_path = loop { + let mut snapshot_version_path = None; + let mut snapshot_file_path = None; + + loop { if let Ok(path) = file_receiver.recv() { let filename = path.file_name().unwrap().to_str().unwrap(); match get_snapshot_file_kind(filename) { - Some(SnapshotFileKind::SnapshotFile) => { - break path; + Some(SnapshotFileKind::Version) => { + snapshot_version_path = Some(path); + + // break if we have both the snapshot file and the version file + if snapshot_file_path.is_some() { + break; + } } - Some(SnapshotFileKind::StorageFile) => { + Some(SnapshotFileKind::BankFields) => { + snapshot_file_path = Some(path); + + // break if we have both the snapshot file and the version file + if snapshot_version_path.is_some() { + break; + } + } + Some(SnapshotFileKind::Storage) => { append_vec_files.push(path); } None => {} // do nothing for other kinds of files @@ -115,21 +156,28 @@ impl SnapshotStorageRebuilder { } else { panic!("did not receive snapshot file from unpacking threads"); } - }; + } + let snapshot_version_path = snapshot_version_path.unwrap(); + let snapshot_file_path = snapshot_file_path.unwrap(); - (snapshot_file_path, append_vec_files) + (snapshot_version_path, snapshot_file_path, append_vec_files) } /// Process the snapshot file to get the size of each snapshot storage file fn process_snapshot_file( + snapshot_version: SnapshotVersion, snapshot_file_path: PathBuf, ) -> Result>, bincode::Error> { let snapshot_file = File::open(snapshot_file_path).unwrap(); let mut snapshot_stream = BufReader::new(snapshot_file); - let (_bank_fields, accounts_fields) = - serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?; + match snapshot_version { + SnapshotVersion::V1_2_0 => { + let (_bank_fields, accounts_fields) = + serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?; - Ok(snapshot_storage_lengths_from_fields(&accounts_fields)) + Ok(snapshot_storage_lengths_from_fields(&accounts_fields)) + } + } } /// Spawn threads for processing buffered append_vec_files, and then received files @@ -191,7 +239,7 @@ impl SnapshotStorageRebuilder { /// Process an append_vec_file fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> { let filename = path.file_name().unwrap().to_str().unwrap().to_owned(); - if let Some(SnapshotFileKind::StorageFile) = get_snapshot_file_kind(&filename) { + if let Some(SnapshotFileKind::Storage) = get_snapshot_file_kind(&filename) { let (slot, slot_complete) = self.insert_slot_storage_file(path, filename); if slot_complete { self.process_complete_slot(slot)?; @@ -290,15 +338,20 @@ impl SnapshotStorageRebuilder { } } -/// Used to determine if a filename is structured like a snapshot file, storage file, or neither +/// Used to determine if a filename is structured like a version file, bank file, or storage file #[derive(PartialEq, Debug)] enum SnapshotFileKind { - SnapshotFile, - StorageFile, + Version, + BankFields, + Storage, } /// Determines `SnapshotFileKind` for `filename` if any fn get_snapshot_file_kind(filename: &str) -> Option { + if filename == "version" { + return Some(SnapshotFileKind::Version); + } + let mut periods = 0; let mut saw_numbers = false; for x in filename.chars() { @@ -318,8 +371,8 @@ fn get_snapshot_file_kind(filename: &str) -> Option { } match (periods, saw_numbers) { - (0, true) => Some(SnapshotFileKind::SnapshotFile), - (1, true) => Some(SnapshotFileKind::StorageFile), + (0, true) => Some(SnapshotFileKind::BankFields), + (1, true) => Some(SnapshotFileKind::Storage), (_, _) => None, } } @@ -342,11 +395,15 @@ mod tests { fn test_get_snapshot_file_kind() { assert_eq!(None, get_snapshot_file_kind("file.txt")); assert_eq!( - Some(SnapshotFileKind::SnapshotFile), + Some(SnapshotFileKind::Version), + get_snapshot_file_kind("version") + ); + assert_eq!( + Some(SnapshotFileKind::BankFields), get_snapshot_file_kind("1234") ); assert_eq!( - Some(SnapshotFileKind::StorageFile), + Some(SnapshotFileKind::Storage), get_snapshot_file_kind("1000.999") ); }