Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshot dir to and from archive #30590

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions runtime/src/hardened_unpack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ pub fn unpack_snapshot<A: Read>(
unpacked_append_vec_map.insert(file.to_string(), path.join("accounts").join(file));
},
|_| {},
false,
)
.map(|_| unpacked_append_vec_map)
}
Expand All @@ -333,6 +334,7 @@ pub fn streaming_unpack_snapshot<A: Read>(
account_paths: &[PathBuf],
parallel_selector: Option<ParallelSelector>,
sender: &crossbeam_channel::Sender<PathBuf>,
unarchive_only: bool,
) -> Result<()> {
unpack_snapshot_with_processors(
archive,
Expand All @@ -342,9 +344,12 @@ pub fn streaming_unpack_snapshot<A: Read>(
|_, _| {},
|entry_path_buf| {
if entry_path_buf.is_file() {
sender.send(entry_path_buf).unwrap();
if !unarchive_only {
sender.send(entry_path_buf).unwrap();
}
}
},
unarchive_only,
)
}

Expand All @@ -355,6 +360,7 @@ fn unpack_snapshot_with_processors<A, F, G>(
parallel_selector: Option<ParallelSelector>,
mut accounts_path_processor: F,
entry_processor: G,
from_dir: bool,
) -> Result<()>
where
A: Read,
Expand All @@ -370,7 +376,12 @@ where
MAX_SNAPSHOT_ARCHIVE_UNPACKED_ACTUAL_SIZE,
MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT,
|parts, kind| {
if is_valid_snapshot_archive_entry(parts, kind) {
let is_valid = if !from_dir {
is_valid_snapshot_archive_entry(parts, kind)
} else {
is_valid_snapshot_dir_archive_entry(parts, kind)
};
if is_valid {
i += 1;
match &parallel_selector {
Some(parallel_selector) => {
Expand Down Expand Up @@ -453,6 +464,14 @@ fn is_valid_snapshot_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool
}
}

fn is_valid_snapshot_dir_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool {
match (parts, kind) {
([dir], Directory) if all_digits(dir) => true,
([dir, "status_cache"], Regular) if all_digits(dir) => true,
_ => true, // To be more contraining
}
}

pub fn open_genesis_config(
ledger_path: &Path,
max_genesis_archive_unpacked_size: u64,
Expand Down Expand Up @@ -787,7 +806,7 @@ mod tests {

fn finalize_and_unpack_snapshot(archive: tar::Builder<Vec<u8>>) -> Result<()> {
with_finalize_and_unpack(archive, |a, b| {
unpack_snapshot_with_processors(a, b, &[PathBuf::new()], None, |_, _| {}, |_| {})
unpack_snapshot_with_processors(a, b, &[PathBuf::new()], None, |_, _| {}, |_| {}, false)
})
}

Expand Down
204 changes: 204 additions & 0 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use {
mod archive_format;
mod snapshot_storage_rebuilder;
pub use archive_format::*;
use thiserror::private::PathAsDisplay;

pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
Expand Down Expand Up @@ -290,6 +291,9 @@ pub enum SnapshotError {
#[error("archive generation failure {0}")]
ArchiveGenerationFailure(ExitStatus),

#[error("unsupported archive format")]
ArchiveUnsupportedFormat,

#[error("storage path symlink is invalid")]
StoragePathSymlinkInvalid,

Expand Down Expand Up @@ -506,6 +510,29 @@ pub fn write_snapshot_version_file(
Ok(())
}

/// Archive a directory
pub fn archive_dir(
dir: impl AsRef<Path>,
archive_path: impl AsRef<Path>,
archive_format: ArchiveFormat,
) -> Result<()> {
// The destination archive file
let archive_file = fs::File::create(&archive_path)?;

if archive_format != ArchiveFormat::TarZstd {
return Err(SnapshotError::ArchiveUnsupportedFormat);
}
// The compression encoder. It takes in data written by tar TAR, and writes to archive_file
let mut encoder = zstd::stream::Encoder::new(archive_file, 0)?;
let mut archive = tar::Builder::new(&mut encoder as &mut dyn Write);
let archive_inner_dir = dir.as_ref().file_name().unwrap();
archive.append_dir_all(archive_inner_dir, &dir)?;
archive.into_inner()?;
encoder.finish()?;

Ok(())
}

/// Make a snapshot archive out of the snapshot package
pub fn archive_snapshot_package(
snapshot_package: &SnapshotPackage,
Expand Down Expand Up @@ -1559,6 +1586,7 @@ fn spawn_unpack_snapshot_thread(
mut archive: Archive<SharedBufferReader>,
parallel_selector: Option<ParallelSelector>,
thread_index: usize,
unarchive_only: bool,
) -> JoinHandle<()> {
Builder::new()
.name(format!("solUnpkSnpsht{thread_index:02}"))
Expand All @@ -1569,12 +1597,82 @@ fn spawn_unpack_snapshot_thread(
&account_paths,
parallel_selector,
&file_sender,
unarchive_only,
)
.unwrap();
})
.unwrap()
}

/// Follow unarchive_snapshot, just to unchive an archive to the directory,
fn unarchive_to_snapshot_dir(
unpack_dir: impl AsRef<Path>,
snapshot_archive_path: impl AsRef<Path>,
account_paths: &[PathBuf],
archive_format: ArchiveFormat,
parallel_divisions: usize,
) {
let mut measure_streaming_unarchive = Measure::start("streaming-unchive");
let (file_sender, _file_receiver) = crossbeam_channel::unbounded();
unarchive_snapshot_no_streaming(
file_sender,
unpack_dir.as_ref().to_path_buf(),
account_paths.to_vec(),
snapshot_archive_path.as_ref().to_path_buf(),
archive_format,
parallel_divisions,
);
measure_streaming_unarchive.stop();
info!(
"measure_streaming_unarchive {}",
measure_streaming_unarchive.as_ms()
);
}

/// follow streaming_unarchive_snapshot
fn unarchive_snapshot_no_streaming(
file_sender: Sender<PathBuf>,
unpack_dir: PathBuf,
account_paths: Vec<PathBuf>,
snapshot_archive_path: PathBuf,
archive_format: ArchiveFormat,
num_threads: usize,
) -> Vec<JoinHandle<()>> {
let account_paths = Arc::new(account_paths);
let unpack_dir = Arc::new(unpack_dir); // TBD why Arc here?
let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);

// All shared buffer readers need to be created before the threads are spawned
#[allow(clippy::needless_collect)]
let archives: Vec<_> = (0..num_threads)
.map(|_| {
let reader = SharedBufferReader::new(&shared_buffer);
Archive::new(reader)
})
.collect();

archives
.into_iter()
.enumerate()
.map(|(thread_index, archive)| {
let parallel_selector = Some(ParallelSelector {
index: thread_index,
divisions: num_threads,
});

spawn_unpack_snapshot_thread(
file_sender.clone(),
account_paths.clone(),
unpack_dir.clone(),
archive,
parallel_selector,
thread_index,
true,
)
})
.collect()
}

/// Streams unpacked files across channel
fn streaming_unarchive_snapshot(
file_sender: Sender<PathBuf>,
Expand Down Expand Up @@ -1613,6 +1711,7 @@ fn streaming_unarchive_snapshot(
archive,
parallel_selector,
thread_index,
false,
)
})
.collect()
Expand Down Expand Up @@ -2604,6 +2703,39 @@ pub fn bank_to_full_snapshot_archive(
)
}

/// Convenience function to create a full snapshot archive out of any Bank, regardless of state.
/// The Bank will be frozen during the process.
/// This is only called from ledger-tool or tests. Warping is a special case as well.
///
/// Requires:
/// - `bank` is complete
pub fn bank_to_snapshot_dir_for_test(
bank_snapshots_dir: impl AsRef<Path>,
bank: &Bank,
snapshot_version: Option<SnapshotVersion>,
) -> Result<BankSnapshotInfo> {
let snapshot_version = snapshot_version.unwrap_or_default();

assert!(bank.is_complete());
bank.squash(); // Bank may not be a root
bank.force_flush_accounts_cache();
bank.clean_accounts(Some(bank.slot()));
bank.update_accounts_hash(CalcAccountsHashDataSource::Storages, false, false);
bank.rehash(); // Bank accounts may have been manually modified by the caller

let snapshot_storages = bank.get_snapshot_storages(None);
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let bank_snapshot_info = add_bank_snapshot(
&bank_snapshots_dir,
bank,
&snapshot_storages,
snapshot_version,
slot_deltas,
)?;

Ok(bank_snapshot_info)
}

/// Convenience function to create an incremental snapshot archive out of any Bank, regardless of
/// state. The Bank will be frozen during the process.
/// This is only called from ledger-tool or tests. Warping is a special case as well.
Expand Down Expand Up @@ -2658,6 +2790,28 @@ pub fn bank_to_incremental_snapshot_archive(
)
}

pub fn bank_snapshot_dir_to_archive(
slot: Slot,
bank_snapshots_dir: impl AsRef<Path>,
archive_format: ArchiveFormat,
) -> PathBuf {
let bank_snapshot_slot_dir = bank_snapshots_dir
.as_ref()
.to_path_buf()
.join(slot.to_string());

let archive_filename = format!("new_{:?}.{}", slot, archive_format.extension());
let archive_path = bank_snapshots_dir
.as_ref()
.to_path_buf()
.join(archive_filename);
archive_dir(bank_snapshot_slot_dir, &archive_path, archive_format).unwrap();

info!("Created archive {}", archive_path.display());

archive_path
}

/// Helper function to hold shared code to package, process, and archive full snapshots
#[allow(clippy::too_many_arguments)]
pub fn package_and_archive_full_snapshot(
Expand Down Expand Up @@ -4688,4 +4842,54 @@ mod tests {
let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
assert_eq!(snapshot.slot, 1);
}

/// Test roundtrip of bank snapshot direcgtory to archive, then back again.
/// This follows test_roundtrip_bank_to_and_from_full_snapshot_simple
#[test]
fn test_roundtrip_snapshot_dir_to_and_from_archive() {
solana_logger::setup();
let genesis_config = GenesisConfig::default();
let original_bank = Bank::new_for_tests(&genesis_config);
original_bank.fill_bank_with_ticks_for_tests();

// bank in memory -> bank snapshot dir
let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
let bank_snapshots_dir = bank_snapshots_dir_tmp.path();
bank_to_snapshot_dir_for_test(&bank_snapshots_dir, &original_bank, None).unwrap();

// dir -> archive
let snapshot_archive_format = ArchiveFormat::TarZstd;
let archive_path = bank_snapshot_dir_to_archive(
original_bank.slot(),
&bank_snapshots_dir,
snapshot_archive_format,
);
info!("Created archive {}", archive_path.display());

// archive -> unpack dir
// To be debugged. /tmp/unpack works. but any directory generated from TempDir
// is unstable, causing flacky crashes.
//let unpack_dir = bank_snapshots_dir.join("unpack");
//let unpack_dir_tmp = tempfile::TempDir::new().unwrap();
//let unpack_dir = unpack_dir_tmp.path().to_path_buf();
let unpack_dir = PathBuf::from("/tmp/unpack");
std::fs::create_dir(&unpack_dir).unwrap();
let parallel_divisions = 1; // TBD

// get account_paths from bank
let account_paths = &original_bank.accounts().accounts_db.paths;

unarchive_to_snapshot_dir(
&unpack_dir,
archive_path,
account_paths,
snapshot_archive_format,
parallel_divisions,
);
info!("Unpacked to {}", unpack_dir.display());

// unpack dir -> bank in memory
// After constructing_bank PR is committed, will call it here to construct a bank and then compare.
//assert_eq!(original_bank, roundtrip_bank);
}
}