Skip to content

Commit

Permalink
Handle duplicate AppendVec IDs (#20096)
Browse files Browse the repository at this point in the history
When reconstructing the AccountsDb, if the storages came from full and
incremental snapshots generated on different nodes, it's possible that
the AppendVec IDs could overlap/have duplicates, which would cause the
reconstruction to fail.

This commit handles this issue by unconditionally remapping the
AppendVec ID for every AppendVec.

Fixes #17088
  • Loading branch information
brooksprumo authored Sep 23, 2021
1 parent 2ae1e80 commit 1347b50
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 30 deletions.
5 changes: 0 additions & 5 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8669,11 +8669,6 @@ pub mod tests {
accounts.write_version.load(Ordering::Relaxed)
);

assert_eq!(
daccounts.next_id.load(Ordering::Relaxed),
accounts.next_id.load(Ordering::Relaxed)
);

// Get the hash for the latest slot, which should be the only hash in the
// bank_hashes map on the deserialized AccountsDb
assert_eq!(daccounts.bank_hashes.read().unwrap().len(), 2);
Expand Down
102 changes: 77 additions & 25 deletions runtime/src/serde_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use {
log::*,
rayon::prelude::*,
serde::{de::DeserializeOwned, Deserialize, Serialize},
solana_measure::measure::Measure,
solana_program_runtime::InstructionProcessor,
solana_sdk::{
clock::{Epoch, Slot, UnixTimestamp},
Expand All @@ -39,7 +40,10 @@ use {
io::{self, BufReader, BufWriter, Read, Write},
path::{Path, PathBuf},
result::Result,
sync::{atomic::Ordering, Arc, RwLock},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
},
},
};

Expand Down Expand Up @@ -372,17 +376,19 @@ fn reconstruct_single_storage<E>(
slot: &Slot,
append_vec_path: &Path,
storage_entry: &E,
remapped_append_vec_id: Option<AppendVecId>,
new_slot_storage: &mut HashMap<AppendVecId, Arc<AccountStorageEntry>>,
) -> Result<(), Error>
where
E: SerializableStorage,
{
let append_vec_id = remapped_append_vec_id.unwrap_or_else(|| storage_entry.id());
let (accounts, num_accounts) =
AppendVec::new_from_file(append_vec_path, storage_entry.current_len())?;
let u_storage_entry =
AccountStorageEntry::new_existing(*slot, storage_entry.id(), accounts, num_accounts);
AccountStorageEntry::new_existing(*slot, append_vec_id, accounts, num_accounts);

new_slot_storage.insert(storage_entry.id(), Arc::new(u_storage_entry));
new_slot_storage.insert(append_vec_id, Arc::new(u_storage_entry));
Ok(())
}

Expand Down Expand Up @@ -427,6 +433,9 @@ where
}

// Remap the deserialized AppendVec paths to point to correct local paths
let num_collisions = AtomicUsize::new(0);
let next_append_vec_id = AtomicUsize::new(0);
let mut measure_remap = Measure::start("remap");
let mut storage = (0..snapshot_storages.len())
.into_par_iter()
.map(|i| {
Expand All @@ -442,51 +451,94 @@ where
)
})?;

// Remap the AppendVec ID to handle any duplicate IDs that may previously existed
// due to full snapshots and incremental snapshots generated from different nodes
let (remapped_append_vec_id, remapped_append_vec_path) = loop {
let remapped_append_vec_id = next_append_vec_id.fetch_add(1, Ordering::Relaxed);
let remapped_file_name = AppendVec::file_name(*slot, remapped_append_vec_id);
let remapped_append_vec_path =
append_vec_path.parent().unwrap().join(&remapped_file_name);

// Break out of the loop in the following situations:
// 1. The new ID is the same as the original ID. This means we do not need to
// rename the file, since the ID is the "correct" one already.
// 2. There is not a file already at the new path. This means it is safe to
// rename the file to this new path.
// **DEVELOPER NOTE:** Keep this check last so that it can short-circuit if
// possible.
if storage_entry.id() == remapped_append_vec_id
|| std::fs::metadata(&remapped_append_vec_path).is_err()
{
break (remapped_append_vec_id, remapped_append_vec_path);
}

// If we made it this far, a file exists at the new path. Record the collision
// and try again.
num_collisions.fetch_add(1, Ordering::Relaxed);
};
// Only rename the file if the new ID is actually different from the original.
if storage_entry.id() != remapped_append_vec_id {
std::fs::rename(append_vec_path, &remapped_append_vec_path)?;
}

reconstruct_single_storage(
slot,
append_vec_path,
&remapped_append_vec_path,
storage_entry,
Some(remapped_append_vec_id),
&mut new_slot_storage,
)?;
}
Ok((*slot, new_slot_storage))
})
.collect::<Result<HashMap<Slot, _>, Error>>()?;
measure_remap.stop();

// discard any slots with no storage entries
// this can happen if a non-root slot was serialized
// but non-root stores should not be included in the snapshot
storage.retain(|_slot, stores| !stores.is_empty());
assert!(
!storage.is_empty(),
"At least one storage entry must exist from deserializing stream"
);

let next_append_vec_id = next_append_vec_id.load(Ordering::Relaxed);
let max_append_vec_id = next_append_vec_id - 1;
assert!(
max_append_vec_id <= AppendVecId::MAX / 2,
"Storage id {} larger than allowed max",
max_append_vec_id
);

// Process deserialized data, set necessary fields in self
accounts_db
.bank_hashes
.write()
.unwrap()
.insert(snapshot_slot, snapshot_bank_hash_info);

// Process deserialized data, set necessary fields in self
let max_id: usize = *storage
.values()
.flat_map(HashMap::keys)
.max()
.expect("At least one storage entry must exist from deserializing stream");

{
accounts_db.storage.0.extend(
storage.into_iter().map(|(slot, slot_storage_entry)| {
(slot, Arc::new(RwLock::new(slot_storage_entry)))
}),
);
}

if max_id > AppendVecId::MAX / 2 {
panic!("Storage id {} larger than allowed max", max_id);
}

accounts_db.next_id.store(max_id + 1, Ordering::Relaxed);
accounts_db.storage.0.extend(
storage
.into_iter()
.map(|(slot, slot_storage_entry)| (slot, Arc::new(RwLock::new(slot_storage_entry)))),
);
accounts_db
.next_id
.store(next_append_vec_id, Ordering::Relaxed);
accounts_db
.write_version
.fetch_add(snapshot_version, Ordering::Relaxed);
accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index);

datapoint_info!(
"reconstruct_accountsdb_from_fields()",
("remap-time-us", measure_remap.as_us(), i64),
(
"remap-collisions",
num_collisions.load(Ordering::Relaxed),
i64
),
);

Ok(accounts_db)
}

0 comments on commit 1347b50

Please sign in to comment.