From 22314697a1880e0c3ca1a6802e79b1b4949057e1 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 30 Aug 2022 17:02:39 -0500 Subject: [PATCH 1/4] Issue #27346 - deserialize and check snapshot version before account fields --- runtime/src/snapshot_utils.rs | 25 ++--- .../snapshot_storage_rebuilder.rs | 104 +++++++++++++----- 2 files changed, 87 insertions(+), 42 deletions(-) diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index a2ad674b9952ec..d6e8fa204e0a05 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -60,6 +60,7 @@ use { crate::{ accounts_db::{AccountStorageMap, AtomicAppendVecId}, hardened_unpack::streaming_unpack_snapshot, + snapshot_utils::snapshot_storage_rebuilder::SnapshotStorageRebuilderResult, }, crossbeam_channel::Sender, std::thread::{Builder, JoinHandle}, @@ -130,10 +131,6 @@ impl SnapshotVersion { pub fn as_str(self) -> &'static str { <&str as From>::from(self) } - - fn maybe_from_string(version_string: &str) -> Option { - version_string.parse::().ok() - } } /// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and @@ -199,7 +196,7 @@ struct UnarchivedSnapshot { #[derive(Debug)] struct UnpackedSnapshotsDirAndVersion { unpacked_snapshots_dir: PathBuf, - snapshot_version: String, + snapshot_version: SnapshotVersion, } /// Helper type for passing around account storage map and next append vec id @@ -1245,7 +1242,6 @@ where .prefix(unpacked_snapshots_dir_prefix) .tempdir_in(bank_snapshots_dir)?; let unpacked_snapshots_dir = unpack_dir.path().join("snapshots"); - let unpacked_version_file = unpack_dir.path().join("version"); let (file_sender, file_receiver) = crossbeam_channel::unbounded(); streaming_unarchive_snapshot( @@ -1260,7 +1256,7 @@ where let num_rebuilder_threads = num_cpus::get_physical() .saturating_sub(parallel_divisions) .max(1); - let (storage, measure_untar) = measure!( + let (version_and_storages, measure_untar) = measure!( SnapshotStorageRebuilder::rebuild_storage( file_receiver, num_rebuilder_threads, @@ -1270,8 +1266,10 @@ where ); info!("{}", measure_untar); - let snapshot_version = snapshot_version_from_file(&unpacked_version_file)?; - + let SnapshotStorageRebuilderResult { + snapshot_version, + storage, + } = version_and_storages; Ok(UnarchivedSnapshot { unpack_dir, storage, @@ -1709,14 +1707,7 @@ fn verify_unpacked_snapshots_dir_and_version( &unpacked_snapshots_dir_and_version.snapshot_version ); - let snapshot_version = - SnapshotVersion::maybe_from_string(&unpacked_snapshots_dir_and_version.snapshot_version) - .ok_or_else(|| { - get_io_error(&format!( - "unsupported snapshot version: {}", - &unpacked_snapshots_dir_and_version.snapshot_version, - )) - })?; + let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version; let mut bank_snapshots = get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir); if bank_snapshots.len() > 1 { diff --git a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs index 8b8b9269002a3b..4b1e5b30248031 100644 --- a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs +++ b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs @@ -1,6 +1,7 @@ //! Provides interfaces for rebuilding snapshot storages use { + super::{snapshot_version_from_file, SnapshotVersion}, crate::{ accounts_db::{AccountStorageEntry, AccountStorageMap, AppendVecId, AtomicAppendVecId}, serde_snapshot::{ @@ -28,10 +29,17 @@ use { time::Instant, }, }; +/// Convenient wrapper for snapshot version and rebuilt storages +pub(crate) struct SnapshotStorageRebuilderResult { + /// Snapshot version + pub snapshot_version: SnapshotVersion, + /// Rebuilt storages + pub storage: AccountStorageMap, +} /// Stores state for rebuilding snapshot storages #[derive(Debug)] -pub struct SnapshotStorageRebuilder { +pub(crate) struct SnapshotStorageRebuilder { /// Receiver for unpacked snapshot storage files file_receiver: Receiver, /// Number of threads to rebuild with @@ -52,20 +60,32 @@ pub struct SnapshotStorageRebuilder { impl SnapshotStorageRebuilder { /// Synchronously spawns threads to rebuild snapshot storages - pub fn rebuild_storage( + pub(crate) fn rebuild_storage( file_receiver: Receiver, num_threads: usize, next_append_vec_id: Arc, - ) -> AccountStorageMap { - let (snapshot_file_path, append_vec_files) = Self::get_snapshot_file(&file_receiver); - let snapshot_storage_lengths = Self::process_snapshot_file(snapshot_file_path).unwrap(); - Self::spawn_rebuilder_threads( + ) -> SnapshotStorageRebuilderResult { + let (snapshot_version_path, snapshot_file_path, append_vec_files) = + Self::get_version_and_snapshot_files(&file_receiver); + let snapshot_version: SnapshotVersion = snapshot_version_from_file(&snapshot_version_path) + .unwrap() + .parse() + .unwrap(); + let snapshot_storage_lengths = + Self::process_snapshot_file(snapshot_version, snapshot_file_path).unwrap(); + + let account_storage_map = Self::spawn_rebuilder_threads( file_receiver, num_threads, next_append_vec_id, snapshot_storage_lengths, append_vec_files, - ) + ); + + SnapshotStorageRebuilderResult { + snapshot_version, + storage: account_storage_map, + } } /// Create the SnapshotStorageRebuilder for storing state during rebuilding @@ -98,16 +118,34 @@ impl SnapshotStorageRebuilder { /// Waits for snapshot file /// Due to parallel unpacking, we may receive some append_vec files before the snapshot file /// This function will push append_vec files into a buffer until we receive the snapshot file - fn get_snapshot_file(file_receiver: &Receiver) -> (PathBuf, Vec) { + fn get_version_and_snapshot_files( + file_receiver: &Receiver, + ) -> (PathBuf, PathBuf, Vec) { let mut append_vec_files = Vec::with_capacity(1024); - let snapshot_file_path = loop { + let mut snapshot_version_path = None; + let mut snapshot_file_path = None; + + loop { if let Ok(path) = file_receiver.recv() { let filename = path.file_name().unwrap().to_str().unwrap(); match get_snapshot_file_kind(filename) { - Some(SnapshotFileKind::SnapshotFile) => { - break path; + Some(SnapshotFileKind::Version) => { + snapshot_version_path = Some(path); + + // break if we have both the snapshot file and the version file + if snapshot_file_path.is_some() { + break; + } + } + Some(SnapshotFileKind::BankFields) => { + snapshot_file_path = Some(path); + + // break if we have both the snapshot file and the version file + if snapshot_version_path.is_some() { + break; + } } - Some(SnapshotFileKind::StorageFile) => { + Some(SnapshotFileKind::Storage) => { append_vec_files.push(path); } None => {} // do nothing for other kinds of files @@ -115,21 +153,28 @@ impl SnapshotStorageRebuilder { } else { panic!("did not receive snapshot file from unpacking threads"); } - }; + } + let snapshot_version_path = snapshot_version_path.unwrap(); + let snapshot_file_path = snapshot_file_path.unwrap(); - (snapshot_file_path, append_vec_files) + (snapshot_version_path, snapshot_file_path, append_vec_files) } /// Process the snapshot file to get the size of each snapshot storage file fn process_snapshot_file( + snapshot_version: SnapshotVersion, snapshot_file_path: PathBuf, ) -> Result>, bincode::Error> { let snapshot_file = File::open(snapshot_file_path).unwrap(); let mut snapshot_stream = BufReader::new(snapshot_file); - let (_bank_fields, accounts_fields) = - serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?; + match snapshot_version { + SnapshotVersion::V1_2_0 => { + let (_bank_fields, accounts_fields) = + serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?; - Ok(snapshot_storage_lengths_from_fields(&accounts_fields)) + Ok(snapshot_storage_lengths_from_fields(&accounts_fields)) + } + } } /// Spawn threads for processing buffered append_vec_files, and then received files @@ -191,7 +236,7 @@ impl SnapshotStorageRebuilder { /// Process an append_vec_file fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> { let filename = path.file_name().unwrap().to_str().unwrap().to_owned(); - if let Some(SnapshotFileKind::StorageFile) = get_snapshot_file_kind(&filename) { + if let Some(SnapshotFileKind::Storage) = get_snapshot_file_kind(&filename) { let (slot, slot_complete) = self.insert_slot_storage_file(path, filename); if slot_complete { self.process_complete_slot(slot)?; @@ -290,15 +335,20 @@ impl SnapshotStorageRebuilder { } } -/// Used to determine if a filename is structured like a snapshot file, storage file, or neither +/// Used to determine if a filename is structured like a version file, snapshot file, storage file, or neither #[derive(PartialEq, Debug)] enum SnapshotFileKind { - SnapshotFile, - StorageFile, + Version, + BankFields, + Storage, } /// Determines `SnapshotFileKind` for `filename` if any fn get_snapshot_file_kind(filename: &str) -> Option { + if filename == "version" { + return Some(SnapshotFileKind::Version); + } + let mut periods = 0; let mut saw_numbers = false; for x in filename.chars() { @@ -318,8 +368,8 @@ fn get_snapshot_file_kind(filename: &str) -> Option { } match (periods, saw_numbers) { - (0, true) => Some(SnapshotFileKind::SnapshotFile), - (1, true) => Some(SnapshotFileKind::StorageFile), + (0, true) => Some(SnapshotFileKind::BankFields), + (1, true) => Some(SnapshotFileKind::Storage), (_, _) => None, } } @@ -342,11 +392,15 @@ mod tests { fn test_get_snapshot_file_kind() { assert_eq!(None, get_snapshot_file_kind("file.txt")); assert_eq!( - Some(SnapshotFileKind::SnapshotFile), + Some(SnapshotFileKind::Version), + get_snapshot_file_kind("version") + ); + assert_eq!( + Some(SnapshotFileKind::BankFields), get_snapshot_file_kind("1234") ); assert_eq!( - Some(SnapshotFileKind::StorageFile), + Some(SnapshotFileKind::Storage), get_snapshot_file_kind("1000.999") ); } From f5f9c1d26a8ed40ebe7a9daaf5f814e76efeb2eb Mon Sep 17 00:00:00 2001 From: apfitzge Date: Fri, 9 Sep 2022 10:08:37 -0500 Subject: [PATCH 2/4] Update comment on SnapshotFileKind Co-authored-by: Brooks Prumo --- runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs index 4b1e5b30248031..6273bbadb5bb3f 100644 --- a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs +++ b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs @@ -335,7 +335,7 @@ impl SnapshotStorageRebuilder { } } -/// Used to determine if a filename is structured like a version file, snapshot file, storage file, or neither +/// Used to determine if a filename is structured like a version file, bank file, or storage file #[derive(PartialEq, Debug)] enum SnapshotFileKind { Version, From 358a5c1dd819b878fc364942246c421f7d311fb4 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 9 Sep 2022 10:17:59 -0500 Subject: [PATCH 3/4] SnapshotStorageRebuilderResult to RebuiltSnapshotStorage --- runtime/src/snapshot_utils.rs | 4 ++-- runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index d6e8fa204e0a05..5a420268506927 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -60,7 +60,7 @@ use { crate::{ accounts_db::{AccountStorageMap, AtomicAppendVecId}, hardened_unpack::streaming_unpack_snapshot, - snapshot_utils::snapshot_storage_rebuilder::SnapshotStorageRebuilderResult, + snapshot_utils::snapshot_storage_rebuilder::RebuiltSnapshotStorage, }, crossbeam_channel::Sender, std::thread::{Builder, JoinHandle}, @@ -1266,7 +1266,7 @@ where ); info!("{}", measure_untar); - let SnapshotStorageRebuilderResult { + let RebuiltSnapshotStorage { snapshot_version, storage, } = version_and_storages; diff --git a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs index 6273bbadb5bb3f..ef3f6c62427610 100644 --- a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs +++ b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs @@ -30,7 +30,7 @@ use { }, }; /// Convenient wrapper for snapshot version and rebuilt storages -pub(crate) struct SnapshotStorageRebuilderResult { +pub(crate) struct RebuiltSnapshotStorage { /// Snapshot version pub snapshot_version: SnapshotVersion, /// Rebuilt storages @@ -64,7 +64,7 @@ impl SnapshotStorageRebuilder { file_receiver: Receiver, num_threads: usize, next_append_vec_id: Arc, - ) -> SnapshotStorageRebuilderResult { + ) -> RebuiltSnapshotStorage { let (snapshot_version_path, snapshot_file_path, append_vec_files) = Self::get_version_and_snapshot_files(&file_receiver); let snapshot_version: SnapshotVersion = snapshot_version_from_file(&snapshot_version_path) @@ -82,7 +82,7 @@ impl SnapshotStorageRebuilder { append_vec_files, ); - SnapshotStorageRebuilderResult { + RebuiltSnapshotStorage { snapshot_version, storage: account_storage_map, } From d749791aa674f4e1b4aef0ddb9dcdf729b9039e3 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 9 Sep 2022 10:21:49 -0500 Subject: [PATCH 4/4] better error propagation from rebuild_storage --- runtime/src/snapshot_utils.rs | 2 +- .../snapshot_storage_rebuilder.rs | 21 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 5a420268506927..6aa5fb8d802f29 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -1261,7 +1261,7 @@ where file_receiver, num_rebuilder_threads, next_append_vec_id - ), + )?, measure_name ); info!("{}", measure_untar); diff --git a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs index ef3f6c62427610..545b4b2d803177 100644 --- a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs +++ b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs @@ -1,7 +1,7 @@ //! Provides interfaces for rebuilding snapshot storages use { - super::{snapshot_version_from_file, SnapshotVersion}, + super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion}, crate::{ accounts_db::{AccountStorageEntry, AccountStorageMap, AppendVecId, AtomicAppendVecId}, serde_snapshot::{ @@ -64,15 +64,18 @@ impl SnapshotStorageRebuilder { file_receiver: Receiver, num_threads: usize, next_append_vec_id: Arc, - ) -> RebuiltSnapshotStorage { + ) -> Result { let (snapshot_version_path, snapshot_file_path, append_vec_files) = Self::get_version_and_snapshot_files(&file_receiver); - let snapshot_version: SnapshotVersion = snapshot_version_from_file(&snapshot_version_path) - .unwrap() - .parse() - .unwrap(); + let snapshot_version_str = snapshot_version_from_file(&snapshot_version_path)?; + let snapshot_version = snapshot_version_str.parse().map_err(|_| { + get_io_error(&format!( + "unsupported snapshot version: {}", + snapshot_version_str, + )) + })?; let snapshot_storage_lengths = - Self::process_snapshot_file(snapshot_version, snapshot_file_path).unwrap(); + Self::process_snapshot_file(snapshot_version, snapshot_file_path)?; let account_storage_map = Self::spawn_rebuilder_threads( file_receiver, @@ -82,10 +85,10 @@ impl SnapshotStorageRebuilder { append_vec_files, ); - RebuiltSnapshotStorage { + Ok(RebuiltSnapshotStorage { snapshot_version, storage: account_storage_map, - } + }) } /// Create the SnapshotStorageRebuilder for storing state during rebuilding