Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Snapshot pipefitting through the validator cli (#5617)
Browse files Browse the repository at this point in the history
* Handle 404 errors better

* Snapshot pipefitting through the validator cli

* Add download progress bar

* Log the current entrypoint slot
  • Loading branch information
mvines authored Aug 23, 2019
1 parent bde4ba0 commit 3fc5009
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 131 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 14 additions & 37 deletions core/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,14 @@ use std::time::Instant;

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SnapshotConfig {
snapshot_path: PathBuf,
snapshot_package_output_path: PathBuf,
snapshot_interval_slots: usize,
}

impl SnapshotConfig {
pub fn new(
snapshot_path: PathBuf,
snapshot_package_output_path: PathBuf,
snapshot_interval_slots: usize,
) -> Self {
Self {
snapshot_path,
snapshot_package_output_path,
snapshot_interval_slots,
}
}
// Generate a new snapshot every this many slots
pub snapshot_interval_slots: usize,

pub fn snapshot_path(&self) -> &Path {
self.snapshot_path.as_path()
}

pub fn snapshot_package_output_path(&self) -> &Path {
&self.snapshot_package_output_path.as_path()
}
// Where to store the latest packaged snapshot
pub snapshot_package_output_path: PathBuf,

pub fn snapshot_interval_slots(&self) -> usize {
self.snapshot_interval_slots
}
// Where to place the snapshots for recent slots
pub snapshot_path: PathBuf,
}

pub struct BankForks {
Expand Down Expand Up @@ -234,10 +213,7 @@ impl BankForks {
// Generate a snapshot if snapshots are configured and it's been an appropriate number
// of banks since the last snapshot
if self.snapshot_config.is_some() && snapshot_package_sender.is_some() {
let config = self
.snapshot_config
.as_ref()
.expect("Called package_snapshot without a snapshot configuration");
let config = self.snapshot_config.as_ref().unwrap();
info!("setting snapshot root: {}", root);
if root - self.slots_since_snapshot[0] >= config.snapshot_interval_slots as u64 {
let mut snapshot_time = Measure::start("total-snapshot-ms");
Expand Down Expand Up @@ -308,6 +284,7 @@ impl BankForks {
.cloned()
.expect("root must exist in BankForks");
snapshot_utils::add_snapshot(&config.snapshot_path, &bank, slots_since_snapshot)?;

// Package the relevant snapshots
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path);

Expand Down Expand Up @@ -835,7 +812,7 @@ mod tests {
genesis_block_info: GenesisBlockInfo,
}

fn setup_snapshot_test(snapshot_interval: usize) -> SnapshotTestConfig {
fn setup_snapshot_test(snapshot_interval_slots: usize) -> SnapshotTestConfig {
let accounts_dir = TempDir::new().unwrap();
let snapshot_dir = TempDir::new().unwrap();
let snapshot_output_path = TempDir::new().unwrap();
Expand All @@ -847,11 +824,11 @@ mod tests {
bank0.freeze();
let mut bank_forks = BankForks::new(0, bank0);

let snapshot_config = SnapshotConfig::new(
PathBuf::from(snapshot_dir.path()),
PathBuf::from(snapshot_output_path.path()),
snapshot_interval,
);
let snapshot_config = SnapshotConfig {
snapshot_interval_slots,
snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()),
snapshot_path: PathBuf::from(snapshot_dir.path()),
};
bank_forks.set_snapshot_config(snapshot_config.clone());
SnapshotTestConfig {
accounts_dir,
Expand Down
78 changes: 48 additions & 30 deletions core/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::fs::File;
use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind};
use std::path::{Path, PathBuf};
use tar::Archive;
use tempfile::TempDir;

const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";

Expand Down Expand Up @@ -92,29 +91,43 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
Ok(package)
}

pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths> {
let paths = fs::read_dir(&snapshot_path).expect("Invalid snapshot path");
let mut names = paths
.filter_map(|entry| {
entry.ok().and_then(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().ok()))
.unwrap_or(None)
})
})
.map(|slot| {
let snapshot_path = snapshot_path.as_ref().join(slot.to_string());
SlotSnapshotPaths {
slot,
snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)),
snapshot_status_cache_path: snapshot_path.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
}
})
.collect::<Vec<SlotSnapshotPaths>>();

names.sort();
names
pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths>
where
P: std::fmt::Debug,
{
match fs::read_dir(&snapshot_path) {
Ok(paths) => {
let mut names = paths
.filter_map(|entry| {
entry.ok().and_then(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().ok()))
.unwrap_or(None)
})
})
.map(|slot| {
let snapshot_path = snapshot_path.as_ref().join(slot.to_string());
SlotSnapshotPaths {
slot,
snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)),
snapshot_status_cache_path: snapshot_path
.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
}
})
.collect::<Vec<SlotSnapshotPaths>>();

names.sort();
names
}
Err(err) => {
info!(
"Unable to read snapshot directory {:?}: {}",
snapshot_path, err
);
vec![]
}
}
}

pub fn add_snapshot<P: AsRef<Path>>(
Expand Down Expand Up @@ -172,7 +185,7 @@ pub fn remove_snapshot<P: AsRef<Path>>(slot: u64, snapshot_path: P) -> Result<()
}

pub fn bank_slot_from_archive<P: AsRef<Path>>(snapshot_tar: P) -> Result<u64> {
let tempdir = TempDir::new()?;
let tempdir = tempfile::TempDir::new()?;
untar_snapshot_in(&snapshot_tar, &tempdir)?;
let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR);
let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
Expand All @@ -191,22 +204,27 @@ pub fn bank_from_archive<P: AsRef<Path>>(
snapshot_tar: P,
) -> Result<Bank> {
// Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()`
let unpack_dir = tempfile::tempdir_in(snapshot_config.snapshot_path())?;
let unpack_dir = tempfile::tempdir_in(&snapshot_config.snapshot_path)?;
untar_snapshot_in(&snapshot_tar, &unpack_dir)?;

let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR);
let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
let bank = rebuild_bank_from_snapshots(account_paths, &snapshot_paths, unpacked_accounts_dir)?;

// Move the unpacked snapshots into `snapshot_config.snapshot_path()`
let dir_files = fs::read_dir(unpacked_snapshots_dir).expect("Invalid snapshot path");
// Move the unpacked snapshots into `snapshot_config.snapshot_path`
let dir_files = fs::read_dir(&unpacked_snapshots_dir).unwrap_or_else(|err| {
panic!(
"Invalid snapshot path {:?}: {}",
unpacked_snapshots_dir, err
)
});
let paths: Vec<PathBuf> = dir_files
.filter_map(|entry| entry.ok().map(|e| e.path()))
.collect();
let mut copy_options = CopyOptions::new();
copy_options.overwrite = true;
fs_extra::move_items(&paths, snapshot_config.snapshot_path(), &copy_options)?;
fs_extra::move_items(&paths, &snapshot_config.snapshot_path, &copy_options)?;

Ok(bank)
}
Expand Down Expand Up @@ -238,7 +256,7 @@ where
let last_root_paths = snapshot_paths
.last()
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
info!("Load from {:?}", &last_root_paths.snapshot_file_path);
info!("Loading from {:?}", &last_root_paths.snapshot_file_path);
let file = File::open(&last_root_paths.snapshot_file_path)?;
let mut stream = BufReader::new(file);
let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?;
Expand Down
60 changes: 27 additions & 33 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,22 +375,20 @@ fn get_bank_forks(
dev_halt_at_slot: Option<Slot>,
) -> (BankForks, Vec<BankForksInfo>, LeaderScheduleCache) {
let (mut bank_forks, bank_forks_info, leader_schedule_cache) = {
let mut result = None;
if snapshot_config.is_some() {
let snapshot_config = snapshot_config.as_ref().unwrap();

// Blow away any remnants in the snapshots directory
let _ = fs::remove_dir_all(snapshot_config.snapshot_path());
fs::create_dir_all(&snapshot_config.snapshot_path())
if let Some(snapshot_config) = snapshot_config.as_ref() {
info!(
"Initializing snapshot path: {:?}",
snapshot_config.snapshot_path
);
let _ = fs::remove_dir_all(&snapshot_config.snapshot_path);
fs::create_dir_all(&snapshot_config.snapshot_path)
.expect("Couldn't create snapshot directory");

// Get the path to the tar
let tar = snapshot_utils::get_snapshot_tar_path(
&snapshot_config.snapshot_package_output_path(),
&snapshot_config.snapshot_package_output_path,
);

// Check that the snapshot tar exists, try to load the snapshot if it does
if tar.exists() {
info!("Loading snapshot package: {:?}", tar);
// Fail hard here if snapshot fails to load, don't silently continue
let deserialized_bank = snapshot_utils::bank_from_archive(
account_paths
Expand All @@ -401,33 +399,29 @@ fn get_bank_forks(
)
.expect("Load from snapshot failed");

result = Some(
blocktree_processor::process_blocktree_from_root(
blocktree,
Arc::new(deserialized_bank),
verify_ledger,
dev_halt_at_slot,
)
.expect("processing blocktree after loading snapshot failed"),
);
}
}

// If a snapshot doesn't exist
if result.is_none() {
result = Some(
blocktree_processor::process_blocktree(
&genesis_block,
&blocktree,
account_paths,
return blocktree_processor::process_blocktree_from_root(
blocktree,
Arc::new(deserialized_bank),
verify_ledger,
dev_halt_at_slot,
)
.expect("process_blocktree failed"),
);
.expect("processing blocktree after loading snapshot failed");
} else {
info!("Snapshot package does not exist: {:?}", tar);
}
} else {
info!("Snapshots disabled");
}

result.unwrap()
info!("Processing ledger from genesis");
blocktree_processor::process_blocktree(
&genesis_block,
&blocktree,
account_paths,
verify_ledger,
dev_halt_at_slot,
)
.expect("process_blocktree failed")
};

if snapshot_config.is_some() {
Expand Down
8 changes: 4 additions & 4 deletions local_cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ fn test_snapshots_restart_validity() {
// Set up the cluster with 1 snapshotting validator
let mut snapshot_validator_config = ValidatorConfig::default();
snapshot_validator_config.rpc_config.enable_fullnode_exit = true;
snapshot_validator_config.snapshot_config = Some(SnapshotConfig::new(
snapshot_path,
snapshot_package_output_path.clone(),
snapshot_validator_config.snapshot_config = Some(SnapshotConfig {
snapshot_interval_slots,
));
snapshot_package_output_path: snapshot_package_output_path.clone(),
snapshot_path,
});
let num_account_paths = 4;
let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths);
let mut all_account_storage_dirs = vec![account_storage_dirs];
Expand Down
1 change: 0 additions & 1 deletion multinode-demo/validator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ default_arg --identity "$identity_keypair_path"
default_arg --voting-keypair "$voting_keypair_path"
default_arg --storage-keypair "$storage_keypair_path"
default_arg --ledger "$ledger_dir"
#default_arg --snapshot-interval-slots 100

if [[ -n $SOLANA_CUDA ]]; then
program=$solana_validator_cuda
Expand Down
1 change: 0 additions & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ args=(
--rpc-port 8899
--rpc-drone-address 127.0.0.1:9900
--accounts "$dataDir"/accounts
--snapshot-interval-slots 100
)
if [[ -n $blockstreamSocket ]]; then
args+=(--blockstream "$blockstreamSocket")
Expand Down
11 changes: 9 additions & 2 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ impl AccountsDB {
let path_index = thread_rng().gen_range(0, local_account_paths.len());
let local_dir = &local_account_paths[path_index];

std::fs::create_dir_all(local_dir).expect("Create directory failed");

// Move the corresponding AppendVec from the snapshot into the directory pointed
// at by `local_dir`
let append_vec_relative_path =
Expand All @@ -465,8 +467,13 @@ impl AccountsDB {
append_vecs_path.as_ref().join(&append_vec_relative_path);
let mut copy_options = CopyOptions::new();
copy_options.overwrite = true;
fs_extra::move_items(&vec![append_vec_abs_path], &local_dir, &copy_options)
.map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
fs_extra::move_items(&vec![&append_vec_abs_path], &local_dir, &copy_options)
.map_err(|e| {
AccountsDB::get_io_error(&format!(
"Unable to move {:?} to {:?}: {}",
append_vec_abs_path, local_dir, e
))
})?;

// Notify the AppendVec of the new file location
let local_path = local_dir.join(append_vec_relative_path);
Expand Down
2 changes: 2 additions & 0 deletions validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ homepage = "https://solana.com/"
[dependencies]
bzip2 = "0.3.3"
clap = "2.33.0"
console = "0.7.7"
log = "0.4.8"
indicatif = "0.11.0"
reqwest = "0.9.20"
serde_json = "1.0.40"
solana-client = { path = "../client", version = "0.18.0-pre2" }
Expand Down
Loading

0 comments on commit 3fc5009

Please sign in to comment.