From 2fcf2cd052df9a8a31e5ef04b7678c76b7f4f449 Mon Sep 17 00:00:00 2001 From: Xiang Zhu Date: Fri, 3 Mar 2023 21:51:09 -0800 Subject: [PATCH 1/2] snapshot dir to and from archive --- runtime/src/hardened_unpack.rs | 25 +++- runtime/src/snapshot_utils.rs | 206 +++++++++++++++++++++++++++++++++ 2 files changed, 228 insertions(+), 3 deletions(-) diff --git a/runtime/src/hardened_unpack.rs b/runtime/src/hardened_unpack.rs index e016fa1cae957f..a3f3be0ea2b9a1 100644 --- a/runtime/src/hardened_unpack.rs +++ b/runtime/src/hardened_unpack.rs @@ -322,6 +322,7 @@ pub fn unpack_snapshot( unpacked_append_vec_map.insert(file.to_string(), path.join("accounts").join(file)); }, |_| {}, + false, ) .map(|_| unpacked_append_vec_map) } @@ -333,6 +334,7 @@ pub fn streaming_unpack_snapshot( account_paths: &[PathBuf], parallel_selector: Option, sender: &crossbeam_channel::Sender, + unarchive_only: bool, ) -> Result<()> { unpack_snapshot_with_processors( archive, @@ -342,9 +344,12 @@ pub fn streaming_unpack_snapshot( |_, _| {}, |entry_path_buf| { if entry_path_buf.is_file() { - sender.send(entry_path_buf).unwrap(); + if !unarchive_only { + sender.send(entry_path_buf).unwrap(); + } } }, + unarchive_only, ) } @@ -355,6 +360,7 @@ fn unpack_snapshot_with_processors( parallel_selector: Option, mut accounts_path_processor: F, entry_processor: G, + from_dir: bool, ) -> Result<()> where A: Read, @@ -370,7 +376,12 @@ where MAX_SNAPSHOT_ARCHIVE_UNPACKED_ACTUAL_SIZE, MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT, |parts, kind| { - if is_valid_snapshot_archive_entry(parts, kind) { + let is_valid = if !from_dir { + is_valid_snapshot_archive_entry(parts, kind) + } else { + is_valid_snapshot_dir_archive_entry(parts, kind) + }; + if is_valid { i += 1; match ¶llel_selector { Some(parallel_selector) => { @@ -453,6 +464,14 @@ fn is_valid_snapshot_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool } } +fn is_valid_snapshot_dir_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool { + match (parts, kind) { + ([dir], Directory) if all_digits(dir) => true, + ([dir, "status_cache"], Regular) if all_digits(dir) => true, + _ => true, // To be more contraining + } +} + pub fn open_genesis_config( ledger_path: &Path, max_genesis_archive_unpacked_size: u64, @@ -787,7 +806,7 @@ mod tests { fn finalize_and_unpack_snapshot(archive: tar::Builder>) -> Result<()> { with_finalize_and_unpack(archive, |a, b| { - unpack_snapshot_with_processors(a, b, &[PathBuf::new()], None, |_, _| {}, |_| {}) + unpack_snapshot_with_processors(a, b, &[PathBuf::new()], None, |_, _| {}, |_| {}, false) }) } diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index ad2aff2380e540..5386c8bcb0b288 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -68,6 +68,7 @@ use { mod archive_format; mod snapshot_storage_rebuilder; pub use archive_format::*; +use thiserror::private::PathAsDisplay; pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache"; pub const SNAPSHOT_VERSION_FILENAME: &str = "version"; @@ -290,6 +291,9 @@ pub enum SnapshotError { #[error("archive generation failure {0}")] ArchiveGenerationFailure(ExitStatus), + #[error("unsupported archive format")] + ArchiveUnsupportedFormat, + #[error("storage path symlink is invalid")] StoragePathSymlinkInvalid, @@ -506,6 +510,29 @@ pub fn write_snapshot_version_file( Ok(()) } +/// Archive a directory +pub fn archive_dir( + dir: impl AsRef, + archive_path: impl AsRef, + archive_format: ArchiveFormat, +) -> Result<()> { + // The destination archive file + let archive_file = fs::File::create(&archive_path)?; + + if archive_format != ArchiveFormat::TarZstd { + return Err(SnapshotError::ArchiveUnsupportedFormat); + } + // The compression encoder. It takes in data written by tar TAR, and writes to archive_file + let mut encoder = zstd::stream::Encoder::new(archive_file, 0)?; + let mut archive = tar::Builder::new(&mut encoder as &mut dyn Write); + let archive_inner_dir = dir.as_ref().file_name().unwrap(); + archive.append_dir_all(archive_inner_dir, &dir)?; + archive.into_inner()?; + encoder.finish()?; + + Ok(()) +} + /// Make a snapshot archive out of the snapshot package pub fn archive_snapshot_package( snapshot_package: &SnapshotPackage, @@ -1559,6 +1586,7 @@ fn spawn_unpack_snapshot_thread( mut archive: Archive, parallel_selector: Option, thread_index: usize, + unarchive_only: bool, ) -> JoinHandle<()> { Builder::new() .name(format!("solUnpkSnpsht{thread_index:02}")) @@ -1569,12 +1597,82 @@ fn spawn_unpack_snapshot_thread( &account_paths, parallel_selector, &file_sender, + unarchive_only, ) .unwrap(); }) .unwrap() } +/// Follow unarchive_snapshot, just to unchive an archive to the directory, +fn unarchive_to_snapshot_dir( + unpack_dir: impl AsRef, + snapshot_archive_path: impl AsRef, + account_paths: &[PathBuf], + archive_format: ArchiveFormat, + parallel_divisions: usize, +) { + let mut measure_streaming_unarchive = Measure::start("streaming-unchive"); + let (file_sender, _file_receiver) = crossbeam_channel::unbounded(); + unarchive_snapshot_no_streaming( + file_sender, + unpack_dir.as_ref().to_path_buf(), + account_paths.to_vec(), + snapshot_archive_path.as_ref().to_path_buf(), + archive_format, + parallel_divisions, + ); + measure_streaming_unarchive.stop(); + info!( + "xxx measure_streaming_unarchive {}", + measure_streaming_unarchive.as_ms() + ); +} + +/// follow streaming_unarchive_snapshot +fn unarchive_snapshot_no_streaming( + file_sender: Sender, + unpack_dir: PathBuf, + account_paths: Vec, + snapshot_archive_path: PathBuf, + archive_format: ArchiveFormat, + num_threads: usize, +) -> Vec> { + let account_paths = Arc::new(account_paths); + let unpack_dir = Arc::new(unpack_dir); // TBD why Arc here? + let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format); + + // All shared buffer readers need to be created before the threads are spawned + #[allow(clippy::needless_collect)] + let archives: Vec<_> = (0..num_threads) + .map(|_| { + let reader = SharedBufferReader::new(&shared_buffer); + Archive::new(reader) + }) + .collect(); + + archives + .into_iter() + .enumerate() + .map(|(thread_index, archive)| { + let parallel_selector = Some(ParallelSelector { + index: thread_index, + divisions: num_threads, + }); + + spawn_unpack_snapshot_thread( + file_sender.clone(), + account_paths.clone(), + unpack_dir.clone(), + archive, + parallel_selector, + thread_index, + true, + ) + }) + .collect() +} + /// Streams unpacked files across channel fn streaming_unarchive_snapshot( file_sender: Sender, @@ -1613,6 +1711,7 @@ fn streaming_unarchive_snapshot( archive, parallel_selector, thread_index, + false, ) }) .collect() @@ -2604,6 +2703,39 @@ pub fn bank_to_full_snapshot_archive( ) } +/// Convenience function to create a full snapshot archive out of any Bank, regardless of state. +/// The Bank will be frozen during the process. +/// This is only called from ledger-tool or tests. Warping is a special case as well. +/// +/// Requires: +/// - `bank` is complete +pub fn bank_to_snapshot_dir_for_test( + bank_snapshots_dir: impl AsRef, + bank: &Bank, + snapshot_version: Option, +) -> Result { + let snapshot_version = snapshot_version.unwrap_or_default(); + + assert!(bank.is_complete()); + bank.squash(); // Bank may not be a root + bank.force_flush_accounts_cache(); + bank.clean_accounts(Some(bank.slot())); + bank.update_accounts_hash(CalcAccountsHashDataSource::Storages, false, false); + bank.rehash(); // Bank accounts may have been manually modified by the caller + + let snapshot_storages = bank.get_snapshot_storages(None); + let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas(); + let bank_snapshot_info = add_bank_snapshot( + &bank_snapshots_dir, + bank, + &snapshot_storages, + snapshot_version, + slot_deltas, + )?; + + Ok(bank_snapshot_info) +} + /// Convenience function to create an incremental snapshot archive out of any Bank, regardless of /// state. The Bank will be frozen during the process. /// This is only called from ledger-tool or tests. Warping is a special case as well. @@ -2658,6 +2790,32 @@ pub fn bank_to_incremental_snapshot_archive( ) } +pub fn bank_snapshot_dir_to_archive( + slot: Slot, + bank_snapshots_dir: impl AsRef, + archive_format: ArchiveFormat, +) -> PathBuf { + let bank_snapshot_slot_dir = bank_snapshots_dir + .as_ref() + .to_path_buf() + .join(slot.to_string()); + println!( + "xxx bank_snapshot_slot_dir {}", + bank_snapshot_slot_dir.as_display() + ); + + let archive_filename = format!("new_{:?}.{}", slot, archive_format.extension()); + let archive_path = bank_snapshots_dir + .as_ref() + .to_path_buf() + .join(archive_filename); + archive_dir(bank_snapshot_slot_dir, &archive_path, archive_format).unwrap(); + + info!("Created archive {}", archive_path.display()); + + archive_path +} + /// Helper function to hold shared code to package, process, and archive full snapshots #[allow(clippy::too_many_arguments)] pub fn package_and_archive_full_snapshot( @@ -4688,4 +4846,52 @@ mod tests { let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap(); assert_eq!(snapshot.slot, 1); } + + /// Test roundtrip of bank snapshot direcgtory to archive, then back again. + /// This follows test_roundtrip_bank_to_and_from_full_snapshot_simple + #[test] + fn test_roundtrip_snapshot_dir_to_and_from_archive() { + solana_logger::setup(); + let genesis_config = GenesisConfig::default(); + let original_bank = Bank::new_for_tests(&genesis_config); + original_bank.fill_bank_with_ticks_for_tests(); + + // bank in memory -> bank snapshot dir + let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap(); + let bank_snapshots_dir = bank_snapshots_dir_tmp.path(); + bank_to_snapshot_dir_for_test(&bank_snapshots_dir, &original_bank, None).unwrap(); + + // dir -> archive + let snapshot_archive_format = ArchiveFormat::TarZstd; + let archive_path = bank_snapshot_dir_to_archive( + original_bank.slot(), + &bank_snapshots_dir, + snapshot_archive_format, + ); + info!("Created archive {}", archive_path.display()); + + // archive -> unpack dir + //let unpack_dir = bank_snapshots_dir.join("unpack"); + //let unpack_dir_tmp = tempfile::TempDir::new().unwrap(); + //let unpack_dir = unpack_dir_tmp.path().to_path_buf(); + let unpack_dir = PathBuf::from("/tmp/unpack"); + std::fs::create_dir(&unpack_dir).unwrap(); + let parallel_divisions = 1; // TBD + + // get account_paths from bank + let account_paths = &original_bank.accounts().accounts_db.paths; + + unarchive_to_snapshot_dir( + &unpack_dir, + archive_path, + account_paths, + snapshot_archive_format, + parallel_divisions, + ); + info!("Unpacked to {}", unpack_dir.display()); + + // unpack dir -> bank in memory + // After constructing_bank PR is committed, will call it here to construct a bank and then compare. + //assert_eq!(original_bank, roundtrip_bank); + } } From ecdfe3fa7fb817ae4c86dcae9e12d2ee5621c952 Mon Sep 17 00:00:00 2001 From: Xiang Zhu Date: Fri, 3 Mar 2023 21:59:01 -0800 Subject: [PATCH 2/2] minor comments cleanup --- runtime/src/snapshot_utils.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 5386c8bcb0b288..a1548ea5a349c3 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -1624,7 +1624,7 @@ fn unarchive_to_snapshot_dir( ); measure_streaming_unarchive.stop(); info!( - "xxx measure_streaming_unarchive {}", + "measure_streaming_unarchive {}", measure_streaming_unarchive.as_ms() ); } @@ -2799,10 +2799,6 @@ pub fn bank_snapshot_dir_to_archive( .as_ref() .to_path_buf() .join(slot.to_string()); - println!( - "xxx bank_snapshot_slot_dir {}", - bank_snapshot_slot_dir.as_display() - ); let archive_filename = format!("new_{:?}.{}", slot, archive_format.extension()); let archive_path = bank_snapshots_dir @@ -4871,6 +4867,8 @@ mod tests { info!("Created archive {}", archive_path.display()); // archive -> unpack dir + // To be debugged. /tmp/unpack works. but any directory generated from TempDir + // is unstable, causing flacky crashes. //let unpack_dir = bank_snapshots_dir.join("unpack"); //let unpack_dir_tmp = tempfile::TempDir::new().unwrap(); //let unpack_dir = unpack_dir_tmp.path().to_path_buf();