Skip to content

Commit

Permalink
Interleaved snapshot unpack versioning (#27484)
Browse files Browse the repository at this point in the history
* Issue #27346 - deserialize and check snapshot version before account fields

* Update comment on SnapshotFileKind

Co-authored-by: Brooks Prumo <[email protected]>

* SnapshotStorageRebuilderResult to RebuiltSnapshotStorage

* better error propagation from rebuild_storage

Co-authored-by: apfitzge <[email protected]>
Co-authored-by: Brooks Prumo <[email protected]>
  • Loading branch information
3 people authored Sep 22, 2022
1 parent 91d556d commit a846d50
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 43 deletions.
27 changes: 9 additions & 18 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use {
crate::{
accounts_db::{AccountStorageMap, AtomicAppendVecId},
hardened_unpack::streaming_unpack_snapshot,
snapshot_utils::snapshot_storage_rebuilder::RebuiltSnapshotStorage,
},
crossbeam_channel::Sender,
std::thread::{Builder, JoinHandle},
Expand Down Expand Up @@ -134,10 +135,6 @@ impl SnapshotVersion {
pub fn as_str(self) -> &'static str {
<&str as From<Self>>::from(self)
}

fn maybe_from_string(version_string: &str) -> Option<SnapshotVersion> {
version_string.parse::<Self>().ok()
}
}

/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
Expand Down Expand Up @@ -203,7 +200,7 @@ struct UnarchivedSnapshot {
#[derive(Debug)]
struct UnpackedSnapshotsDirAndVersion {
unpacked_snapshots_dir: PathBuf,
snapshot_version: String,
snapshot_version: SnapshotVersion,
}

/// Helper type for passing around account storage map and next append vec id
Expand Down Expand Up @@ -1262,7 +1259,6 @@ where
.prefix(unpacked_snapshots_dir_prefix)
.tempdir_in(bank_snapshots_dir)?;
let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
let unpacked_version_file = unpack_dir.path().join("version");

let (file_sender, file_receiver) = crossbeam_channel::unbounded();
streaming_unarchive_snapshot(
Expand All @@ -1277,18 +1273,20 @@ where
let num_rebuilder_threads = num_cpus::get_physical()
.saturating_sub(parallel_divisions)
.max(1);
let (storage, measure_untar) = measure!(
let (version_and_storages, measure_untar) = measure!(
SnapshotStorageRebuilder::rebuild_storage(
file_receiver,
num_rebuilder_threads,
next_append_vec_id
),
)?,
measure_name
);
info!("{}", measure_untar);

let snapshot_version = snapshot_version_from_file(&unpacked_version_file)?;

let RebuiltSnapshotStorage {
snapshot_version,
storage,
} = version_and_storages;
Ok(UnarchivedSnapshot {
unpack_dir,
storage,
Expand Down Expand Up @@ -1726,14 +1724,7 @@ fn verify_unpacked_snapshots_dir_and_version(
&unpacked_snapshots_dir_and_version.snapshot_version
);

let snapshot_version =
SnapshotVersion::maybe_from_string(&unpacked_snapshots_dir_and_version.snapshot_version)
.ok_or_else(|| {
get_io_error(&format!(
"unsupported snapshot version: {}",
&unpacked_snapshots_dir_and_version.snapshot_version,
))
})?;
let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
let mut bank_snapshots =
get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
if bank_snapshots.len() > 1 {
Expand Down
107 changes: 82 additions & 25 deletions runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Provides interfaces for rebuilding snapshot storages
use {
super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion},
crate::{
accounts_db::{AccountStorageEntry, AccountStorageMap, AppendVecId, AtomicAppendVecId},
serde_snapshot::{
Expand Down Expand Up @@ -28,10 +29,17 @@ use {
time::Instant,
},
};
/// Convenient wrapper for snapshot version and rebuilt storages
pub(crate) struct RebuiltSnapshotStorage {
/// Snapshot version
pub snapshot_version: SnapshotVersion,
/// Rebuilt storages
pub storage: AccountStorageMap,
}

/// Stores state for rebuilding snapshot storages
#[derive(Debug)]
pub struct SnapshotStorageRebuilder {
pub(crate) struct SnapshotStorageRebuilder {
/// Receiver for unpacked snapshot storage files
file_receiver: Receiver<PathBuf>,
/// Number of threads to rebuild with
Expand All @@ -52,20 +60,35 @@ pub struct SnapshotStorageRebuilder {

impl SnapshotStorageRebuilder {
/// Synchronously spawns threads to rebuild snapshot storages
pub fn rebuild_storage(
pub(crate) fn rebuild_storage(
file_receiver: Receiver<PathBuf>,
num_threads: usize,
next_append_vec_id: Arc<AtomicAppendVecId>,
) -> AccountStorageMap {
let (snapshot_file_path, append_vec_files) = Self::get_snapshot_file(&file_receiver);
let snapshot_storage_lengths = Self::process_snapshot_file(snapshot_file_path).unwrap();
Self::spawn_rebuilder_threads(
) -> Result<RebuiltSnapshotStorage, SnapshotError> {
let (snapshot_version_path, snapshot_file_path, append_vec_files) =
Self::get_version_and_snapshot_files(&file_receiver);
let snapshot_version_str = snapshot_version_from_file(&snapshot_version_path)?;
let snapshot_version = snapshot_version_str.parse().map_err(|_| {
get_io_error(&format!(
"unsupported snapshot version: {}",
snapshot_version_str,
))
})?;
let snapshot_storage_lengths =
Self::process_snapshot_file(snapshot_version, snapshot_file_path)?;

let account_storage_map = Self::spawn_rebuilder_threads(
file_receiver,
num_threads,
next_append_vec_id,
snapshot_storage_lengths,
append_vec_files,
)
);

Ok(RebuiltSnapshotStorage {
snapshot_version,
storage: account_storage_map,
})
}

/// Create the SnapshotStorageRebuilder for storing state during rebuilding
Expand Down Expand Up @@ -98,38 +121,63 @@ impl SnapshotStorageRebuilder {
/// Waits for snapshot file
/// Due to parallel unpacking, we may receive some append_vec files before the snapshot file
/// This function will push append_vec files into a buffer until we receive the snapshot file
fn get_snapshot_file(file_receiver: &Receiver<PathBuf>) -> (PathBuf, Vec<PathBuf>) {
fn get_version_and_snapshot_files(
file_receiver: &Receiver<PathBuf>,
) -> (PathBuf, PathBuf, Vec<PathBuf>) {
let mut append_vec_files = Vec::with_capacity(1024);
let snapshot_file_path = loop {
let mut snapshot_version_path = None;
let mut snapshot_file_path = None;

loop {
if let Ok(path) = file_receiver.recv() {
let filename = path.file_name().unwrap().to_str().unwrap();
match get_snapshot_file_kind(filename) {
Some(SnapshotFileKind::SnapshotFile) => {
break path;
Some(SnapshotFileKind::Version) => {
snapshot_version_path = Some(path);

// break if we have both the snapshot file and the version file
if snapshot_file_path.is_some() {
break;
}
}
Some(SnapshotFileKind::StorageFile) => {
Some(SnapshotFileKind::BankFields) => {
snapshot_file_path = Some(path);

// break if we have both the snapshot file and the version file
if snapshot_version_path.is_some() {
break;
}
}
Some(SnapshotFileKind::Storage) => {
append_vec_files.push(path);
}
None => {} // do nothing for other kinds of files
}
} else {
panic!("did not receive snapshot file from unpacking threads");
}
};
}
let snapshot_version_path = snapshot_version_path.unwrap();
let snapshot_file_path = snapshot_file_path.unwrap();

(snapshot_file_path, append_vec_files)
(snapshot_version_path, snapshot_file_path, append_vec_files)
}

/// Process the snapshot file to get the size of each snapshot storage file
fn process_snapshot_file(
snapshot_version: SnapshotVersion,
snapshot_file_path: PathBuf,
) -> Result<HashMap<Slot, HashMap<usize, usize>>, bincode::Error> {
let snapshot_file = File::open(snapshot_file_path).unwrap();
let mut snapshot_stream = BufReader::new(snapshot_file);
let (_bank_fields, accounts_fields) =
serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?;
match snapshot_version {
SnapshotVersion::V1_2_0 => {
let (_bank_fields, accounts_fields) =
serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?;

Ok(snapshot_storage_lengths_from_fields(&accounts_fields))
Ok(snapshot_storage_lengths_from_fields(&accounts_fields))
}
}
}

/// Spawn threads for processing buffered append_vec_files, and then received files
Expand Down Expand Up @@ -191,7 +239,7 @@ impl SnapshotStorageRebuilder {
/// Process an append_vec_file
fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> {
let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
if let Some(SnapshotFileKind::StorageFile) = get_snapshot_file_kind(&filename) {
if let Some(SnapshotFileKind::Storage) = get_snapshot_file_kind(&filename) {
let (slot, slot_complete) = self.insert_slot_storage_file(path, filename);
if slot_complete {
self.process_complete_slot(slot)?;
Expand Down Expand Up @@ -290,15 +338,20 @@ impl SnapshotStorageRebuilder {
}
}

/// Used to determine if a filename is structured like a snapshot file, storage file, or neither
/// Used to determine if a filename is structured like a version file, bank file, or storage file
#[derive(PartialEq, Debug)]
enum SnapshotFileKind {
SnapshotFile,
StorageFile,
Version,
BankFields,
Storage,
}

/// Determines `SnapshotFileKind` for `filename` if any
fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
if filename == "version" {
return Some(SnapshotFileKind::Version);
}

let mut periods = 0;
let mut saw_numbers = false;
for x in filename.chars() {
Expand All @@ -318,8 +371,8 @@ fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
}

match (periods, saw_numbers) {
(0, true) => Some(SnapshotFileKind::SnapshotFile),
(1, true) => Some(SnapshotFileKind::StorageFile),
(0, true) => Some(SnapshotFileKind::BankFields),
(1, true) => Some(SnapshotFileKind::Storage),
(_, _) => None,
}
}
Expand All @@ -342,11 +395,15 @@ mod tests {
fn test_get_snapshot_file_kind() {
assert_eq!(None, get_snapshot_file_kind("file.txt"));
assert_eq!(
Some(SnapshotFileKind::SnapshotFile),
Some(SnapshotFileKind::Version),
get_snapshot_file_kind("version")
);
assert_eq!(
Some(SnapshotFileKind::BankFields),
get_snapshot_file_kind("1234")
);
assert_eq!(
Some(SnapshotFileKind::StorageFile),
Some(SnapshotFileKind::Storage),
get_snapshot_file_kind("1000.999")
);
}
Expand Down

0 comments on commit a846d50

Please sign in to comment.