From d69f60229d55f48fc8d37f9a5798252fa8c402d2 Mon Sep 17 00:00:00 2001 From: Xiang Zhu Date: Wed, 22 Mar 2023 13:49:23 -0700 Subject: [PATCH] Construct bank from snapshot dir (#30171) * Construct a bank from a snapshot directory * Clean up. Remove archiving releated data structures * Fix auto checks * remove ArchiveFormat::None * fix merge error * remove incremental snapshot dir * Minor cleanup, remove unused functiond defs * remove pub fn bank_from_latest_snapshot_dir * rename bank_from_snapshot_dir to bank_from_snapshot * Clean up invalid comments * A few minor review changes * Removed insert_slot_storage_file * Add comment explain hardlink symlink * Skip the whole verify_snapshot_bank call for the from_dir case * Add bank.set_initial_accounts_hash_verification_completed() * address review issues: appendvec to append_vec, replace unwrap with expect, etc * AtomicAppendVecId, remove arc on bank etc * slice, CI error on &snapshot_version_path * measure_build_storage * move snapshot_from * remove measure_name from build_storage_from_snapshot_dir * remove from_dir specific next_append_vec_id logic * revert insert_slot_storage_file change * init next_append_vec_id to fix the substraction underflow * remove measure from build_storage_from_snapshot_dir * make measure name more specific * refactor status_cache deserialization into a function * remove reference to pass the ci check * track next appendvec id * verify that the next_append_vec_id tracking is correct * clean up usize * in build_storage_from_snapshot_dir remove expect * test max appendvecc id tracking with multiple banks in the test * cleared expect and unwrap in streaming_snapshot_dir_files * rebase cleanup * change to measure! * dereference arc in the right way --- runtime/src/serde_snapshot.rs | 2 +- runtime/src/snapshot_utils.rs | 359 +++++++++++++++++- .../snapshot_storage_rebuilder.rs | 71 ++-- 3 files changed, 385 insertions(+), 47 deletions(-) diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 82be27209f2b8e..95b8a526d8d3cb 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -598,7 +598,7 @@ where Ok(bank) } -fn reconstruct_single_storage( +pub(crate) fn reconstruct_single_storage( slot: &Slot, append_vec_path: &Path, current_len: usize, diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index c09ff9b08f29fb..c02f75ecda869a 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -249,6 +249,17 @@ pub enum BankSnapshotType { Post, } +/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive. Now, +/// the snapshot can be from a snapshot directory, or from a snapshot archive. This is the flag to +/// indicate which. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum SnapshotFrom { + /// Build from the snapshot archive + Archive, + /// Build directly from the bank snapshot directory + Dir, +} + /// Helper type when rebuilding from snapshots. Designed to handle when rebuilding from just a /// full snapshot, or from both a full snapshot and an incremental snapshot. #[derive(Debug)] @@ -290,6 +301,9 @@ pub enum SnapshotError { #[error("serialization error: {0}")] Serialize(#[from] bincode::Error), + #[error("crossbeam send error: {0}")] + CrossbeamSend(#[from] crossbeam_channel::SendError), + #[error("archive generation failure {0}")] ArchiveGenerationFailure(ExitStatus), @@ -337,7 +351,11 @@ pub enum SnapshotError { #[error("no valid snapshot dir found under {}", .0.display())] NoSnapshotSlotDir(PathBuf), + + #[error("snapshot dir account paths mismatching")] + AccountPathsMismatch, } + #[derive(Error, Debug)] pub enum SnapshotNewFromDirError { #[error("I/O error: {0}")] @@ -1324,7 +1342,7 @@ fn verify_and_unarchive_snapshots( let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT); - let next_append_vec_id = Arc::new(AtomicU32::new(0)); + let next_append_vec_id = Arc::new(AtomicAppendVecId::new(0)); let unarchived_full_snapshot = unarchive_snapshot( &bank_snapshots_dir, TMP_SNAPSHOT_ARCHIVE_PREFIX, @@ -1443,7 +1461,7 @@ pub fn bank_from_snapshot_archives( }; let mut measure_rebuild = Measure::start("rebuild bank from snapshots"); - let bank = rebuild_bank_from_snapshots( + let bank = rebuild_bank_from_unarchived_snapshots( &unarchived_full_snapshot.unpacked_snapshots_dir_and_version, unarchived_incremental_snapshot .as_ref() @@ -1617,6 +1635,70 @@ pub fn bank_from_latest_snapshot_archives( )) } +/// Build bank from a snapshot (a snapshot directory, not a snapshot archive) +#[allow(clippy::too_many_arguments)] +pub fn bank_from_snapshot_dir( + account_paths: &[PathBuf], + bank_snapshot: &BankSnapshotInfo, + genesis_config: &GenesisConfig, + runtime_config: &RuntimeConfig, + debug_keys: Option>>, + additional_builtins: Option<&Builtins>, + account_secondary_indexes: AccountSecondaryIndexes, + limit_load_slot_count_from_snapshot: Option, + shrink_ratio: AccountShrinkThreshold, + verify_index: bool, + accounts_db_config: Option, + accounts_update_notifier: Option, + exit: &Arc, +) -> Result<(Bank, BankFromArchiveTimings)> { + let next_append_vec_id = Arc::new(AtomicAppendVecId::new(0)); + + let (storage, measure_build_storage) = measure!( + build_storage_from_snapshot_dir(bank_snapshot, account_paths, next_append_vec_id.clone())?, + "build storage from snapshot dir" + ); + info!("{}", measure_build_storage); + + let next_append_vec_id = + Arc::try_unwrap(next_append_vec_id).expect("this is the only strong reference"); + let storage_and_next_append_vec_id = StorageAndNextAppendVecId { + storage, + next_append_vec_id, + }; + let mut measure_rebuild = Measure::start("rebuild bank from snapshots"); + let bank = rebuild_bank_from_snapshot( + bank_snapshot, + account_paths, + storage_and_next_append_vec_id, + genesis_config, + runtime_config, + debug_keys, + additional_builtins, + account_secondary_indexes, + limit_load_slot_count_from_snapshot, + shrink_ratio, + verify_index, + accounts_db_config, + accounts_update_notifier, + exit, + )?; + measure_rebuild.stop(); + info!("{}", measure_rebuild); + + // Skip bank.verify_snapshot_bank. Subsequent snapshot requests/accounts hash verification requests + // will calculate and check the accounts hash, so we will still have safety/correctness there. + bank.set_initial_accounts_hash_verification_completed(); + + let timings = BankFromArchiveTimings { + rebuild_bank_from_snapshots_us: measure_rebuild.as_us(), + full_snapshot_untar_us: measure_build_storage.as_us(), + incremental_snapshot_untar_us: 0, + verify_snapshot_bank_us: 0, + }; + Ok((bank, timings)) +} + /// Check to make sure the deserialized bank's slot and hash matches the snapshot archive's slot /// and hash fn verify_bank_against_expected_slot_hash( @@ -1748,7 +1830,7 @@ fn unarchive_snapshot( account_paths: &[PathBuf], archive_format: ArchiveFormat, parallel_divisions: usize, - next_append_vec_id: Arc, + next_append_vec_id: Arc, ) -> Result where P: AsRef, @@ -1776,7 +1858,8 @@ where SnapshotStorageRebuilder::rebuild_storage( file_receiver, num_rebuilder_threads, - next_append_vec_id + next_append_vec_id, + SnapshotFrom::Archive, )?, measure_name ); @@ -1799,6 +1882,99 @@ where }) } +/// Streams snapshot dir files across channel +/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case. +fn streaming_snapshot_dir_files( + file_sender: Sender, + snapshot_file_path: impl Into, + snapshot_version_path: impl Into, + account_paths: &[PathBuf], +) -> Result<()> { + file_sender.send(snapshot_file_path.into())?; + file_sender.send(snapshot_version_path.into())?; + + for account_path in account_paths { + for file in fs::read_dir(account_path)? { + file_sender.send(file?.path())?; + } + } + + Ok(()) +} + +/// Perform the common tasks when deserialize a snapshot. Handles reading snapshot file, reading the version file, +/// and then returning those fields plus the rebuilt storage +fn build_storage_from_snapshot_dir( + snapshot_info: &BankSnapshotInfo, + account_paths: &[PathBuf], + next_append_vec_id: Arc, +) -> Result { + let bank_snapshot_dir = &snapshot_info.snapshot_dir; + let snapshot_file_path = &snapshot_info.snapshot_path(); + let snapshot_version_path = bank_snapshot_dir.join("version"); + let (file_sender, file_receiver) = crossbeam_channel::unbounded(); + + let accounts_hardlinks = bank_snapshot_dir.join("accounts_hardlinks"); + + let account_paths_set: HashSet<_> = HashSet::from_iter(account_paths.iter()); + + for dir_symlink in fs::read_dir(accounts_hardlinks)? { + // The symlink point to /snapshot/ which contain the account files hardlinks + // The corresponding run path should be /run/ + let snapshot_account_path = fs::read_link(dir_symlink?.path())?; + let account_run_path = snapshot_account_path + .parent() + .ok_or_else(|| SnapshotError::InvalidAccountPath(snapshot_account_path.clone()))? + .parent() + .ok_or_else(|| SnapshotError::InvalidAccountPath(snapshot_account_path.clone()))? + .join("run"); + if !account_paths_set.contains(&account_run_path) { + // The appendvec from the bank snapshot stoarge does not match any of the provided account_paths set. + // The accout paths have changed so the snapshot is no longer usable. + return Err(SnapshotError::AccountPathsMismatch); + } + // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec + // paths be in accounts/ + for file in fs::read_dir(snapshot_account_path)? { + let file_path = file?.path().to_path_buf(); + let file_name = file_path + .file_name() + .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.clone()))?; + let dest_path = account_run_path.clone().join(file_name); + fs::hard_link(&file_path, &dest_path).map_err(|e| { + let err_msg = format!( + "Error: {}. Failed to hard-link {} to {}", + e, + file_path.display(), + dest_path.display() + ); + SnapshotError::Io(IoError::new(ErrorKind::Other, err_msg)) + })?; + } + } + + streaming_snapshot_dir_files( + file_sender, + snapshot_file_path, + snapshot_version_path, + account_paths, + )?; + + let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1); + let version_and_storages = SnapshotStorageRebuilder::rebuild_storage( + file_receiver, + num_rebuilder_threads, + next_append_vec_id, + SnapshotFrom::Dir, + )?; + + let RebuiltSnapshotStorage { + snapshot_version: _, + storage, + } = version_and_storages; + Ok(storage) +} + /// Reads the `snapshot_version` from a file. Before opening the file, its size /// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this /// threshold, it is not opened and an error is returned. @@ -2279,8 +2455,23 @@ fn bank_fields_from_snapshots( }) } +fn deserialize_status_cache(status_cache_path: &Path) -> Result> { + deserialize_snapshot_data_file(status_cache_path, |stream| { + info!( + "Rebuilding status cache from {}", + status_cache_path.display() + ); + let slot_delta: Vec = bincode::options() + .with_limit(MAX_SNAPSHOT_DATA_FILE_SIZE) + .with_fixint_encoding() + .allow_trailing_bytes() + .deserialize_from(stream)?; + Ok(slot_delta) + }) +} + #[allow(clippy::too_many_arguments)] -fn rebuild_bank_from_snapshots( +fn rebuild_bank_from_unarchived_snapshots( full_snapshot_unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion, incremental_snapshot_unpacked_snapshots_dir_and_version: Option< &UnpackedSnapshotsDirAndVersion, @@ -2315,7 +2506,7 @@ fn rebuild_bank_from_snapshots( (None, None) }; info!( - "Loading bank from full snapshot {} and incremental snapshot {:?}", + "Rebuiding bank from full snapshot {} and incremental snapshot {:?}", full_snapshot_root_paths.snapshot_path().display(), incremental_snapshot_root_paths .as_ref() @@ -2368,24 +2559,73 @@ fn rebuild_bank_from_snapshots( }, ) .join(SNAPSHOT_STATUS_CACHE_FILENAME); - let slot_deltas = deserialize_snapshot_data_file(&status_cache_path, |stream| { - info!( - "Rebuilding status cache from {}", - status_cache_path.display() - ); - let slot_deltas: Vec = bincode::options() - .with_limit(MAX_SNAPSHOT_DATA_FILE_SIZE) - .with_fixint_encoding() - .allow_trailing_bytes() - .deserialize_from(stream)?; - Ok(slot_deltas) + let slot_deltas = deserialize_status_cache(&status_cache_path)?; + + verify_slot_deltas(slot_deltas.as_slice(), &bank)?; + + bank.status_cache.write().unwrap().append(&slot_deltas); + + info!("Rebuilt bank for slot: {}", bank.slot()); + Ok(bank) +} + +#[allow(clippy::too_many_arguments)] +fn rebuild_bank_from_snapshot( + bank_snapshot: &BankSnapshotInfo, + account_paths: &[PathBuf], + storage_and_next_append_vec_id: StorageAndNextAppendVecId, + genesis_config: &GenesisConfig, + runtime_config: &RuntimeConfig, + debug_keys: Option>>, + additional_builtins: Option<&Builtins>, + account_secondary_indexes: AccountSecondaryIndexes, + limit_load_slot_count_from_snapshot: Option, + shrink_ratio: AccountShrinkThreshold, + verify_index: bool, + accounts_db_config: Option, + accounts_update_notifier: Option, + exit: &Arc, +) -> Result { + info!( + "Rebuilding bank from snapshot {}", + bank_snapshot.snapshot_dir.display(), + ); + + let snapshot_root_paths = SnapshotRootPaths { + full_snapshot_root_file_path: bank_snapshot.snapshot_path(), + incremental_snapshot_root_file_path: None, + }; + + let bank = deserialize_snapshot_data_files(&snapshot_root_paths, |snapshot_streams| { + Ok(bank_from_streams( + SerdeStyle::Newer, + snapshot_streams, + account_paths, + storage_and_next_append_vec_id, + genesis_config, + runtime_config, + debug_keys, + additional_builtins, + account_secondary_indexes, + limit_load_slot_count_from_snapshot, + shrink_ratio, + verify_index, + accounts_db_config, + accounts_update_notifier, + exit, + )?) })?; + let status_cache_path = bank_snapshot + .snapshot_dir + .join(SNAPSHOT_STATUS_CACHE_FILENAME); + let slot_deltas = deserialize_status_cache(&status_cache_path)?; + verify_slot_deltas(slot_deltas.as_slice(), &bank)?; bank.status_cache.write().unwrap().append(&slot_deltas); - info!("Loaded bank for slot: {}", bank.slot()); + info!("Rebuilt bank for slot: {}", bank.slot()); Ok(bank) } @@ -2920,6 +3160,7 @@ mod tests { accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING, accounts_hash::{CalcAccountsHashConfig, HashStats}, genesis_utils, + snapshot_utils::snapshot_storage_rebuilder::get_slot_and_append_vec_id, sorted_storages::SortedStorages, status_cache::Status, }, @@ -2933,10 +3174,14 @@ mod tests { system_transaction, transaction::SanitizedTransaction, }, - std::{convert::TryFrom, mem::size_of, os::unix::fs::PermissionsExt, sync::Arc}, + std::{ + convert::TryFrom, + mem::size_of, + os::unix::fs::PermissionsExt, + sync::{atomic::Ordering, Arc}, + }, tempfile::NamedTempFile, }; - #[test] fn test_serialize_snapshot_data_file_under_limit() { let temp_dir = tempfile::TempDir::new().unwrap(); @@ -5103,4 +5348,78 @@ mod tests { .unwrap(); assert_eq!(other_incremental_accounts_hash, incremental_accounts_hash); } + + #[test] + fn test_bank_from_snapshot_dir() { + solana_logger::setup(); + let genesis_config = GenesisConfig::default(); + let mut bank = Arc::new(Bank::new_for_tests(&genesis_config)); + + let tmp_dir = tempfile::TempDir::new().unwrap(); + let bank_snapshots_dir = tmp_dir.path(); + let collecter_id = Pubkey::new_unique(); + let snapshot_version = SnapshotVersion::default(); + + for _ in 0..3 { + // prepare the bank + bank = Arc::new(Bank::new_from_parent(&bank, &collecter_id, bank.slot() + 1)); + bank.fill_bank_with_ticks_for_tests(); + bank.squash(); + bank.force_flush_accounts_cache(); + + // generate the bank snapshot directory for slot+1 + let snapshot_storages = bank.get_snapshot_storages(None); + let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas(); + add_bank_snapshot( + bank_snapshots_dir, + &bank, + &snapshot_storages, + snapshot_version, + slot_deltas, + ) + .unwrap(); + } + + let bank_snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap(); + let account_paths = &bank.rc.accounts.accounts_db.paths; + + // Clear the contents of the account paths run directories. When constructing the bank, the appendvec + // files will be extracted from the snapshot hardlink directories into these run/ directories. + for path in account_paths { + delete_contents_of_path(path); + } + + let (bank_constructed, ..) = bank_from_snapshot_dir( + account_paths, + &bank_snapshot, + &genesis_config, + &RuntimeConfig::default(), + None, + None, + AccountSecondaryIndexes::default(), + None, + AccountShrinkThreshold::default(), + false, + Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, + &Arc::default(), + ) + .unwrap(); + + bank_constructed.wait_for_initial_accounts_hash_verification_completed_for_tests(); + assert_eq!(bank_constructed, *bank); + + // Verify that the next_append_vec_id tracking is correct + let mut max_id = 0; + for path in account_paths { + fs::read_dir(path).unwrap().for_each(|entry| { + let path = entry.unwrap().path(); + let filename = path.file_name().unwrap(); + let (_slot, append_vec_id) = get_slot_and_append_vec_id(filename.to_str().unwrap()); + max_id = std::cmp::max(max_id, append_vec_id); + }); + } + let next_id = bank.accounts().accounts_db.next_id.load(Ordering::Relaxed) as usize; + assert_eq!(max_id, next_id - 1); + } } diff --git a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs index 0997a2238e97ec..cf8e1ed8b38a31 100644 --- a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs +++ b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs @@ -1,14 +1,16 @@ //! Provides interfaces for rebuilding snapshot storages use { - super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion}, + super::{ + get_io_error, snapshot_version_from_file, SnapshotError, SnapshotFrom, SnapshotVersion, + }, crate::{ account_storage::{AccountStorageMap, AccountStorageReference}, accounts_db::{AccountStorageEntry, AccountsDb, AppendVecId, AtomicAppendVecId}, append_vec::AppendVec, serde_snapshot::{ - self, remap_and_reconstruct_single_storage, snapshot_storage_lengths_from_fields, - SerdeStyle, SerializedAppendVecId, + self, reconstruct_single_storage, remap_and_reconstruct_single_storage, + snapshot_storage_lengths_from_fields, SerdeStyle, SerializedAppendVecId, }, }, crossbeam_channel::{select, unbounded, Receiver, Sender}, @@ -67,6 +69,8 @@ pub(crate) struct SnapshotStorageRebuilder { processed_slot_count: AtomicUsize, /// Tracks the number of collisions in AppendVecId num_collisions: AtomicUsize, + /// Rebuild from the snapshot files or archives + snapshot_from: SnapshotFrom, } impl SnapshotStorageRebuilder { @@ -75,6 +79,7 @@ impl SnapshotStorageRebuilder { file_receiver: Receiver, num_threads: usize, next_append_vec_id: Arc, + snapshot_from: SnapshotFrom, ) -> Result { let (snapshot_version_path, snapshot_file_path, append_vec_files) = Self::get_version_and_snapshot_files(&file_receiver); @@ -93,6 +98,7 @@ impl SnapshotStorageRebuilder { next_append_vec_id, snapshot_storage_lengths, append_vec_files, + snapshot_from, ) .map_err(|err| SnapshotError::IoWithSource(err, "rebuild snapshot storages"))?; @@ -109,6 +115,7 @@ impl SnapshotStorageRebuilder { num_threads: usize, next_append_vec_id: Arc, snapshot_storage_lengths: HashMap>, + snapshot_from: SnapshotFrom, ) -> Self { let storage = DashMap::with_capacity(snapshot_storage_lengths.len()); let storage_paths: DashMap<_, _> = snapshot_storage_lengths @@ -126,6 +133,7 @@ impl SnapshotStorageRebuilder { next_append_vec_id, processed_slot_count: AtomicUsize::new(0), num_collisions: AtomicUsize::new(0), + snapshot_from, } } @@ -198,18 +206,22 @@ impl SnapshotStorageRebuilder { next_append_vec_id: Arc, snapshot_storage_lengths: HashMap>, append_vec_files: Vec, + snapshot_from: SnapshotFrom, ) -> Result { let rebuilder = Arc::new(SnapshotStorageRebuilder::new( file_receiver, num_threads, next_append_vec_id, snapshot_storage_lengths, + snapshot_from, )); let thread_pool = rebuilder.build_thread_pool(); - // Synchronously process buffered append_vec_files - thread_pool.install(|| rebuilder.process_buffered_files(append_vec_files))?; + if snapshot_from == SnapshotFrom::Archive { + // Synchronously process buffered append_vec_files + thread_pool.install(|| rebuilder.process_buffered_files(append_vec_files))?; + } // Asynchronously spawn threads to process received append_vec_files let (exit_sender, exit_receiver) = unbounded(); @@ -260,8 +272,18 @@ impl SnapshotStorageRebuilder { fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> { let filename = path.file_name().unwrap().to_str().unwrap().to_owned(); if let Some(SnapshotFileKind::Storage) = get_snapshot_file_kind(&filename) { - let (slot, slot_complete) = self.insert_slot_storage_file(path, filename); - if slot_complete { + let (slot, append_vec_id) = get_slot_and_append_vec_id(&filename); + if self.snapshot_from == SnapshotFrom::Dir { + // Keep track of the highest append_vec_id in the system, so the future append_vecs + // can be assigned to unique IDs. This is only needed when loading from a snapshot + // dir. When loading from a snapshot archive, the max of the appendvec IDs is + // updated in remap_append_vec_file(), which is not in the from_dir route. + self.next_append_vec_id + .fetch_max((append_vec_id + 1) as AppendVecId, Ordering::Relaxed); + } + let slot_storage_count = self.insert_storage_file(&slot, path); + if slot_storage_count == self.snapshot_storage_lengths.get(&slot).unwrap().len() { + // slot_complete self.process_complete_slot(slot)?; self.processed_slot_count.fetch_add(1, Ordering::AcqRel); } @@ -269,17 +291,6 @@ impl SnapshotStorageRebuilder { Ok(()) } - /// Inserts single storage file, returns the slot and if the slot has all of its storage entries - fn insert_slot_storage_file(&self, path: PathBuf, filename: String) -> (Slot, bool) { - let (slot, _) = get_slot_and_append_vec_id(&filename); - let slot_storage_count = self.insert_storage_file(&slot, path); - - ( - slot, - slot_storage_count == self.snapshot_storage_lengths.get(&slot).unwrap().len(), - ) - } - /// Insert storage path into slot and return the number of storage files for the slot fn insert_storage_file(&self, slot: &Slot, path: PathBuf) -> usize { let slot_paths = self.storage_paths.get(slot).unwrap(); @@ -305,14 +316,22 @@ impl SnapshotStorageRebuilder { .get(&old_append_vec_id) .unwrap(); - let storage_entry = remap_and_reconstruct_single_storage( - slot, - old_append_vec_id, - current_len, - path.as_path(), - &self.next_append_vec_id, - &self.num_collisions, - )?; + let storage_entry = match &self.snapshot_from { + SnapshotFrom::Archive => remap_and_reconstruct_single_storage( + slot, + old_append_vec_id, + current_len, + path.as_path(), + &self.next_append_vec_id, + &self.num_collisions, + )?, + SnapshotFrom::Dir => reconstruct_single_storage( + &slot, + path.as_path(), + current_len, + old_append_vec_id as AppendVecId, + )?, + }; Ok((storage_entry.append_vec_id(), storage_entry)) })