diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index b7150f50182087..a0b8b004aeee84 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -8669,11 +8669,6 @@ pub mod tests { accounts.write_version.load(Ordering::Relaxed) ); - assert_eq!( - daccounts.next_id.load(Ordering::Relaxed), - accounts.next_id.load(Ordering::Relaxed) - ); - // Get the hash for the latest slot, which should be the only hash in the // bank_hashes map on the deserialized AccountsDb assert_eq!(daccounts.bank_hashes.read().unwrap().len(), 2); diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index a5a10f672f1472..40743bf4c6e5c8 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -22,6 +22,7 @@ use { log::*, rayon::prelude::*, serde::{de::DeserializeOwned, Deserialize, Serialize}, + solana_measure::measure::Measure, solana_program_runtime::InstructionProcessor, solana_sdk::{ clock::{Epoch, Slot, UnixTimestamp}, @@ -39,7 +40,10 @@ use { io::{self, BufReader, BufWriter, Read, Write}, path::{Path, PathBuf}, result::Result, - sync::{atomic::Ordering, Arc, RwLock}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, RwLock, + }, }, }; @@ -372,17 +376,19 @@ fn reconstruct_single_storage( slot: &Slot, append_vec_path: &Path, storage_entry: &E, + remapped_append_vec_id: Option, new_slot_storage: &mut HashMap>, ) -> Result<(), Error> where E: SerializableStorage, { + let append_vec_id = remapped_append_vec_id.unwrap_or_else(|| storage_entry.id()); let (accounts, num_accounts) = AppendVec::new_from_file(append_vec_path, storage_entry.current_len())?; let u_storage_entry = - AccountStorageEntry::new_existing(*slot, storage_entry.id(), accounts, num_accounts); + AccountStorageEntry::new_existing(*slot, append_vec_id, accounts, num_accounts); - new_slot_storage.insert(storage_entry.id(), Arc::new(u_storage_entry)); + new_slot_storage.insert(append_vec_id, Arc::new(u_storage_entry)); Ok(()) } @@ -427,6 +433,9 @@ where } // Remap the deserialized AppendVec paths to point to correct local paths + let num_collisions = AtomicUsize::new(0); + let next_append_vec_id = AtomicUsize::new(0); + let mut measure_remap = Measure::start("remap"); let mut storage = (0..snapshot_storages.len()) .into_par_iter() .map(|i| { @@ -442,51 +451,94 @@ where ) })?; + // Remap the AppendVec ID to handle any duplicate IDs that may previously existed + // due to full snapshots and incremental snapshots generated from different nodes + let (remapped_append_vec_id, remapped_append_vec_path) = loop { + let remapped_append_vec_id = next_append_vec_id.fetch_add(1, Ordering::Relaxed); + let remapped_file_name = AppendVec::file_name(*slot, remapped_append_vec_id); + let remapped_append_vec_path = + append_vec_path.parent().unwrap().join(&remapped_file_name); + + // Break out of the loop in the following situations: + // 1. The new ID is the same as the original ID. This means we do not need to + // rename the file, since the ID is the "correct" one already. + // 2. There is not a file already at the new path. This means it is safe to + // rename the file to this new path. + // **DEVELOPER NOTE:** Keep this check last so that it can short-circuit if + // possible. + if storage_entry.id() == remapped_append_vec_id + || std::fs::metadata(&remapped_append_vec_path).is_err() + { + break (remapped_append_vec_id, remapped_append_vec_path); + } + + // If we made it this far, a file exists at the new path. Record the collision + // and try again. + num_collisions.fetch_add(1, Ordering::Relaxed); + }; + // Only rename the file if the new ID is actually different from the original. + if storage_entry.id() != remapped_append_vec_id { + std::fs::rename(append_vec_path, &remapped_append_vec_path)?; + } + reconstruct_single_storage( slot, - append_vec_path, + &remapped_append_vec_path, storage_entry, + Some(remapped_append_vec_id), &mut new_slot_storage, )?; } Ok((*slot, new_slot_storage)) }) .collect::, Error>>()?; + measure_remap.stop(); // discard any slots with no storage entries // this can happen if a non-root slot was serialized // but non-root stores should not be included in the snapshot storage.retain(|_slot, stores| !stores.is_empty()); + assert!( + !storage.is_empty(), + "At least one storage entry must exist from deserializing stream" + ); + + let next_append_vec_id = next_append_vec_id.load(Ordering::Relaxed); + let max_append_vec_id = next_append_vec_id - 1; + assert!( + max_append_vec_id <= AppendVecId::MAX / 2, + "Storage id {} larger than allowed max", + max_append_vec_id + ); + // Process deserialized data, set necessary fields in self accounts_db .bank_hashes .write() .unwrap() .insert(snapshot_slot, snapshot_bank_hash_info); - - // Process deserialized data, set necessary fields in self - let max_id: usize = *storage - .values() - .flat_map(HashMap::keys) - .max() - .expect("At least one storage entry must exist from deserializing stream"); - - { - accounts_db.storage.0.extend( - storage.into_iter().map(|(slot, slot_storage_entry)| { - (slot, Arc::new(RwLock::new(slot_storage_entry))) - }), - ); - } - - if max_id > AppendVecId::MAX / 2 { - panic!("Storage id {} larger than allowed max", max_id); - } - - accounts_db.next_id.store(max_id + 1, Ordering::Relaxed); + accounts_db.storage.0.extend( + storage + .into_iter() + .map(|(slot, slot_storage_entry)| (slot, Arc::new(RwLock::new(slot_storage_entry)))), + ); + accounts_db + .next_id + .store(next_append_vec_id, Ordering::Relaxed); accounts_db .write_version .fetch_add(snapshot_version, Ordering::Relaxed); accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index); + + datapoint_info!( + "reconstruct_accountsdb_from_fields()", + ("remap-time-us", measure_remap.as_us(), i64), + ( + "remap-collisions", + num_collisions.load(Ordering::Relaxed), + i64 + ), + ); + Ok(accounts_db) }