Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interleaved snapshot unpack versioning #27484

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use {
crate::{
accounts_db::{AccountStorageMap, AtomicAppendVecId},
hardened_unpack::streaming_unpack_snapshot,
snapshot_utils::snapshot_storage_rebuilder::RebuiltSnapshotStorage,
},
crossbeam_channel::Sender,
std::thread::{Builder, JoinHandle},
Expand Down Expand Up @@ -130,10 +131,6 @@ impl SnapshotVersion {
pub fn as_str(self) -> &'static str {
<&str as From<Self>>::from(self)
}

fn maybe_from_string(version_string: &str) -> Option<SnapshotVersion> {
version_string.parse::<Self>().ok()
}
}

/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
Expand Down Expand Up @@ -199,7 +196,7 @@ struct UnarchivedSnapshot {
#[derive(Debug)]
struct UnpackedSnapshotsDirAndVersion {
unpacked_snapshots_dir: PathBuf,
snapshot_version: String,
snapshot_version: SnapshotVersion,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙌

}

/// Helper type for passing around account storage map and next append vec id
Expand Down Expand Up @@ -1245,7 +1242,6 @@ where
.prefix(unpacked_snapshots_dir_prefix)
.tempdir_in(bank_snapshots_dir)?;
let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
let unpacked_version_file = unpack_dir.path().join("version");

let (file_sender, file_receiver) = crossbeam_channel::unbounded();
streaming_unarchive_snapshot(
Expand All @@ -1260,18 +1256,20 @@ where
let num_rebuilder_threads = num_cpus::get_physical()
.saturating_sub(parallel_divisions)
.max(1);
let (storage, measure_untar) = measure!(
let (version_and_storages, measure_untar) = measure!(
SnapshotStorageRebuilder::rebuild_storage(
file_receiver,
num_rebuilder_threads,
next_append_vec_id
),
)?,
measure_name
);
info!("{}", measure_untar);

let snapshot_version = snapshot_version_from_file(&unpacked_version_file)?;

let RebuiltSnapshotStorage {
snapshot_version,
storage,
} = version_and_storages;
Ok(UnarchivedSnapshot {
unpack_dir,
storage,
Expand Down Expand Up @@ -1709,14 +1707,7 @@ fn verify_unpacked_snapshots_dir_and_version(
&unpacked_snapshots_dir_and_version.snapshot_version
);

let snapshot_version =
SnapshotVersion::maybe_from_string(&unpacked_snapshots_dir_and_version.snapshot_version)
.ok_or_else(|| {
get_io_error(&format!(
"unsupported snapshot version: {}",
&unpacked_snapshots_dir_and_version.snapshot_version,
))
})?;
let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
let mut bank_snapshots =
get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
if bank_snapshots.len() > 1 {
Expand Down
107 changes: 82 additions & 25 deletions runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Provides interfaces for rebuilding snapshot storages

use {
super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion},
crate::{
accounts_db::{AccountStorageEntry, AccountStorageMap, AppendVecId, AtomicAppendVecId},
serde_snapshot::{
Expand Down Expand Up @@ -28,10 +29,17 @@ use {
time::Instant,
},
};
/// Convenient wrapper for snapshot version and rebuilt storages
pub(crate) struct RebuiltSnapshotStorage {
/// Snapshot version
pub snapshot_version: SnapshotVersion,
/// Rebuilt storages
pub storage: AccountStorageMap,
}

/// Stores state for rebuilding snapshot storages
#[derive(Debug)]
pub struct SnapshotStorageRebuilder {
pub(crate) struct SnapshotStorageRebuilder {
/// Receiver for unpacked snapshot storage files
file_receiver: Receiver<PathBuf>,
/// Number of threads to rebuild with
Expand All @@ -52,20 +60,35 @@ pub struct SnapshotStorageRebuilder {

impl SnapshotStorageRebuilder {
/// Synchronously spawns threads to rebuild snapshot storages
pub fn rebuild_storage(
pub(crate) fn rebuild_storage(
file_receiver: Receiver<PathBuf>,
num_threads: usize,
next_append_vec_id: Arc<AtomicAppendVecId>,
) -> AccountStorageMap {
let (snapshot_file_path, append_vec_files) = Self::get_snapshot_file(&file_receiver);
let snapshot_storage_lengths = Self::process_snapshot_file(snapshot_file_path).unwrap();
Self::spawn_rebuilder_threads(
) -> Result<RebuiltSnapshotStorage, SnapshotError> {
let (snapshot_version_path, snapshot_file_path, append_vec_files) =
Self::get_version_and_snapshot_files(&file_receiver);
let snapshot_version_str = snapshot_version_from_file(&snapshot_version_path)?;
let snapshot_version = snapshot_version_str.parse().map_err(|_| {
get_io_error(&format!(
"unsupported snapshot version: {}",
snapshot_version_str,
))
})?;
let snapshot_storage_lengths =
Self::process_snapshot_file(snapshot_version, snapshot_file_path)?;

let account_storage_map = Self::spawn_rebuilder_threads(
file_receiver,
num_threads,
next_append_vec_id,
snapshot_storage_lengths,
append_vec_files,
)
);

Ok(RebuiltSnapshotStorage {
snapshot_version,
storage: account_storage_map,
})
}

/// Create the SnapshotStorageRebuilder for storing state during rebuilding
Expand Down Expand Up @@ -98,38 +121,63 @@ impl SnapshotStorageRebuilder {
/// Waits for snapshot file
/// Due to parallel unpacking, we may receive some append_vec files before the snapshot file
/// This function will push append_vec files into a buffer until we receive the snapshot file
fn get_snapshot_file(file_receiver: &Receiver<PathBuf>) -> (PathBuf, Vec<PathBuf>) {
fn get_version_and_snapshot_files(
file_receiver: &Receiver<PathBuf>,
) -> (PathBuf, PathBuf, Vec<PathBuf>) {
let mut append_vec_files = Vec::with_capacity(1024);
let snapshot_file_path = loop {
let mut snapshot_version_path = None;
let mut snapshot_file_path = None;

loop {
if let Ok(path) = file_receiver.recv() {
let filename = path.file_name().unwrap().to_str().unwrap();
match get_snapshot_file_kind(filename) {
Some(SnapshotFileKind::SnapshotFile) => {
break path;
Some(SnapshotFileKind::Version) => {
snapshot_version_path = Some(path);

// break if we have both the snapshot file and the version file
if snapshot_file_path.is_some() {
break;
}
}
Some(SnapshotFileKind::StorageFile) => {
Some(SnapshotFileKind::BankFields) => {
snapshot_file_path = Some(path);

// break if we have both the snapshot file and the version file
if snapshot_version_path.is_some() {
break;
}
}
Some(SnapshotFileKind::Storage) => {
append_vec_files.push(path);
}
None => {} // do nothing for other kinds of files
}
} else {
panic!("did not receive snapshot file from unpacking threads");
}
};
}
let snapshot_version_path = snapshot_version_path.unwrap();
let snapshot_file_path = snapshot_file_path.unwrap();

(snapshot_file_path, append_vec_files)
(snapshot_version_path, snapshot_file_path, append_vec_files)
}

/// Process the snapshot file to get the size of each snapshot storage file
fn process_snapshot_file(
snapshot_version: SnapshotVersion,
snapshot_file_path: PathBuf,
) -> Result<HashMap<Slot, HashMap<usize, usize>>, bincode::Error> {
let snapshot_file = File::open(snapshot_file_path).unwrap();
let mut snapshot_stream = BufReader::new(snapshot_file);
let (_bank_fields, accounts_fields) =
serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?;
match snapshot_version {
SnapshotVersion::V1_2_0 => {
Comment on lines +173 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

let (_bank_fields, accounts_fields) =
serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?;

Ok(snapshot_storage_lengths_from_fields(&accounts_fields))
Ok(snapshot_storage_lengths_from_fields(&accounts_fields))
}
}
}

/// Spawn threads for processing buffered append_vec_files, and then received files
Expand Down Expand Up @@ -191,7 +239,7 @@ impl SnapshotStorageRebuilder {
/// Process an append_vec_file
fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> {
let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
if let Some(SnapshotFileKind::StorageFile) = get_snapshot_file_kind(&filename) {
if let Some(SnapshotFileKind::Storage) = get_snapshot_file_kind(&filename) {
let (slot, slot_complete) = self.insert_slot_storage_file(path, filename);
if slot_complete {
self.process_complete_slot(slot)?;
Expand Down Expand Up @@ -290,15 +338,20 @@ impl SnapshotStorageRebuilder {
}
}

/// Used to determine if a filename is structured like a snapshot file, storage file, or neither
/// Used to determine if a filename is structured like a version file, bank file, or storage file
#[derive(PartialEq, Debug)]
enum SnapshotFileKind {
SnapshotFile,
StorageFile,
Version,
BankFields,
Storage,
}

/// Determines `SnapshotFileKind` for `filename` if any
fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
if filename == "version" {
return Some(SnapshotFileKind::Version);
}

let mut periods = 0;
let mut saw_numbers = false;
for x in filename.chars() {
Expand All @@ -318,8 +371,8 @@ fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
}

match (periods, saw_numbers) {
(0, true) => Some(SnapshotFileKind::SnapshotFile),
(1, true) => Some(SnapshotFileKind::StorageFile),
(0, true) => Some(SnapshotFileKind::BankFields),
(1, true) => Some(SnapshotFileKind::Storage),
(_, _) => None,
}
}
Expand All @@ -342,11 +395,15 @@ mod tests {
fn test_get_snapshot_file_kind() {
assert_eq!(None, get_snapshot_file_kind("file.txt"));
assert_eq!(
Some(SnapshotFileKind::SnapshotFile),
Some(SnapshotFileKind::Version),
get_snapshot_file_kind("version")
);
assert_eq!(
Some(SnapshotFileKind::BankFields),
get_snapshot_file_kind("1234")
);
assert_eq!(
Some(SnapshotFileKind::StorageFile),
Some(SnapshotFileKind::Storage),
get_snapshot_file_kind("1000.999")
);
}
Expand Down