From d45ced0a5d1b67657e643b735849f2e5bbb2ceef Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Thu, 2 Sep 2021 19:05:15 -0500 Subject: [PATCH] Make startup aware of Incremental Snapshots (#19550) --- core/src/retransmit_stage.rs | 13 +- core/src/tvu.rs | 42 +++--- core/src/validator.rs | 54 +++++--- core/tests/snapshots.rs | 4 +- ledger-tool/src/main.rs | 20 +-- ledger/src/bank_forks_utils.rs | 74 +++++++---- ledger/src/blockstore_processor.rs | 206 +++++++++++++++++++++++++++-- runtime/src/snapshot_utils.rs | 16 ++- 8 files changed, 337 insertions(+), 92 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 9b2f2b0a471c1d..c14e06aa3d0560 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -572,8 +572,17 @@ mod tests { full_leader_cache: true, ..ProcessOptions::default() }; - let (bank_forks, cached_leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap(); + let (accounts_package_sender, _) = channel(); + let (bank_forks, cached_leader_schedule, _) = process_blockstore( + &genesis_config, + &blockstore, + Vec::new(), + opts, + None, + None, + accounts_package_sender, + ) + .unwrap(); let leader_schedule_cache = Arc::new(cached_leader_schedule); let bank_forks = Arc::new(RwLock::new(bank_forks)); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 64fbe95cf6add1..d066b88344b0f0 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -44,10 +44,10 @@ use solana_runtime::{ bank_forks::BankForks, commitment::BlockCommitmentCache, snapshot_config::SnapshotConfig, - snapshot_package::PendingSnapshotPackage, + snapshot_package::{AccountsPackageReceiver, AccountsPackageSender, PendingSnapshotPackage}, vote_sender_types::ReplayVoteSender, }; -use solana_sdk::{pubkey::Pubkey, signature::Keypair}; +use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}; use std::{ boxed::Box, collections::HashSet, @@ -135,6 +135,8 @@ impl Tvu { tvu_config: TvuConfig, max_slots: &Arc, cost_model: &Arc>, + accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver), + last_full_snapshot_slot: Option, ) -> Self { let Sockets { repair: repair_socket, @@ -212,9 +214,9 @@ impl Tvu { (Some(snapshot_config), Some(pending_snapshot_package)) }) .unwrap_or((None, None)); - let (accounts_hash_sender, accounts_hash_receiver) = channel(); + let (accounts_package_sender, accounts_package_receiver) = accounts_package_channel; let accounts_hash_verifier = AccountsHashVerifier::new( - accounts_hash_receiver, + accounts_package_receiver, pending_snapshot_package, exit, cluster_info, @@ -224,20 +226,19 @@ impl Tvu { snapshot_config.clone(), ); - let (snapshot_request_sender, snapshot_request_handler) = { - snapshot_config - .map(|snapshot_config| { - let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); - ( - Some(snapshot_request_sender), - Some(SnapshotRequestHandler { - snapshot_config, - snapshot_request_receiver, - accounts_package_sender: accounts_hash_sender, - }), - ) - }) - .unwrap_or((None, None)) + let (snapshot_request_sender, snapshot_request_handler) = match snapshot_config { + None => (None, None), + Some(snapshot_config) => { + let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); + ( + Some(snapshot_request_sender), + Some(SnapshotRequestHandler { + snapshot_config, + snapshot_request_receiver, + accounts_package_sender, + }), + ) + } }; let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); @@ -340,7 +341,7 @@ impl Tvu { tvu_config.accounts_db_caching_enabled, tvu_config.test_hash_calculation, tvu_config.use_index_hash_calculation, - None, + last_full_snapshot_slot, ); Tvu { @@ -434,6 +435,7 @@ pub mod tests { let (_, gossip_confirmed_slots_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let tower = Tower::default(); + let accounts_package_channel = channel(); let tvu = Tvu::new( &vote_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), @@ -477,6 +479,8 @@ pub mod tests { TvuConfig::default(), &Arc::new(MaxSlots::default()), &Arc::new(RwLock::new(CostModel::default())), + accounts_package_channel, + None, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 8aa97df2c7cc8b..28ebf66e29f9ed 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -69,7 +69,7 @@ use { hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, - snapshot_package::PendingSnapshotPackage, + snapshot_package::{AccountsPackageSender, PendingSnapshotPackage}, snapshot_utils, }, solana_sdk::{ @@ -92,7 +92,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::Receiver, + mpsc::{channel, Receiver}, Arc, Mutex, RwLock, }, thread::{sleep, Builder, JoinHandle}, @@ -379,7 +379,7 @@ impl Validator { .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed))); } - let (replay_vote_sender, replay_vote_receiver) = unbounded(); + let accounts_package_channel = channel(); let ( genesis_config, bank_forks, @@ -387,6 +387,7 @@ impl Validator { ledger_signal_receiver, completed_slots_receiver, leader_schedule_cache, + last_full_snapshot_slot, snapshot_hash, TransactionHistoryServices { transaction_status_sender, @@ -408,6 +409,7 @@ impl Validator { config.enforce_ulimit_nofile, &start_progress, config.no_poh_speed_test, + accounts_package_channel.0.clone(), ); *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; @@ -707,6 +709,7 @@ impl Validator { let rpc_completed_slots_service = RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone()); + let (replay_vote_sender, replay_vote_receiver) = unbounded(); let tvu = Tvu::new( vote_account, authorized_voter_keypairs, @@ -777,6 +780,8 @@ impl Validator { }, &max_slots, &cost_model, + accounts_package_channel, + last_full_snapshot_slot, ); let tpu = Tpu::new( @@ -1069,7 +1074,7 @@ fn post_process_restored_tower( }) } -#[allow(clippy::type_complexity)] +#[allow(clippy::type_complexity, clippy::too_many_arguments)] fn new_banks_from_ledger( validator_identity: &Pubkey, vote_account: &Pubkey, @@ -1080,6 +1085,7 @@ fn new_banks_from_ledger( enforce_ulimit_nofile: bool, start_progress: &Arc>, no_poh_speed_test: bool, + accounts_package_sender: AccountsPackageSender, ) -> ( GenesisConfig, BankForks, @@ -1087,6 +1093,7 @@ fn new_banks_from_ledger( Receiver, CompletedSlotsReceiver, LeaderScheduleCache, + Option, Option<(Slot, Hash)>, TransactionHistoryServices, Tower, @@ -1182,24 +1189,26 @@ fn new_banks_from_ledger( TransactionHistoryServices::default() }; - let (mut bank_forks, mut leader_schedule_cache, snapshot_hash) = bank_forks_utils::load( - &genesis_config, - &blockstore, - config.account_paths.clone(), - config.account_shrink_paths.clone(), - config.snapshot_config.as_ref(), - process_options, - transaction_history_services - .transaction_status_sender - .as_ref(), - transaction_history_services - .cache_block_meta_sender - .as_ref(), - ) - .unwrap_or_else(|err| { - error!("Failed to load ledger: {:?}", err); - abort() - }); + let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot, snapshot_hash) = + bank_forks_utils::load( + &genesis_config, + &blockstore, + config.account_paths.clone(), + config.account_shrink_paths.clone(), + config.snapshot_config.as_ref(), + process_options, + transaction_history_services + .transaction_status_sender + .as_ref(), + transaction_history_services + .cache_block_meta_sender + .as_ref(), + accounts_package_sender, + ) + .unwrap_or_else(|err| { + error!("Failed to load ledger: {:?}", err); + abort() + }); if let Some(warp_slot) = config.warp_slot { let snapshot_config = config.snapshot_config.as_ref().unwrap_or_else(|| { @@ -1277,6 +1286,7 @@ fn new_banks_from_ledger( ledger_signal_receiver, completed_slots_receiver, leader_schedule_cache, + last_full_snapshot_slot, snapshot_hash, transaction_history_services, tower, diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 1a4b5ec7d914a2..62bdc65fee132c 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -819,7 +819,7 @@ mod tests { accounts_dir: PathBuf, genesis_config: &GenesisConfig, ) -> snapshot_utils::Result<()> { - let (deserialized_bank, _) = snapshot_utils::bank_from_latest_snapshot_archives( + let (deserialized_bank, ..) = snapshot_utils::bank_from_latest_snapshot_archives( &snapshot_config.bank_snapshots_dir, &snapshot_config.snapshot_archives_dir, &[accounts_dir], @@ -997,7 +997,7 @@ mod tests { std::thread::sleep(Duration::from_secs(5)); info!("Awake! Rebuilding bank from latest snapshot archives..."); - let (deserialized_bank, _) = snapshot_utils::bank_from_latest_snapshot_archives( + let (deserialized_bank, ..) = snapshot_utils::bank_from_latest_snapshot_archives( &snapshot_test_config.snapshot_config.bank_snapshots_dir, &snapshot_test_config.snapshot_config.snapshot_archives_dir, &[snapshot_test_config.accounts_dir.as_ref().to_path_buf()], diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index a47fbdb9ba6464..a8a05067ac69c6 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -63,7 +63,7 @@ use std::{ path::{Path, PathBuf}, process::{exit, Command, Stdio}, str::FromStr, - sync::{Arc, RwLock}, + sync::{mpsc::channel, Arc, RwLock}, }; mod bigtable; @@ -712,7 +712,7 @@ fn load_bank_forks( let snapshot_archives_dir = snapshot_archive_path.unwrap_or_else(|| blockstore.ledger_path().to_path_buf()); Some(SnapshotConfig { - full_snapshot_archive_interval_slots: 0, // Value doesn't matter + full_snapshot_archive_interval_slots: Slot::MAX, incremental_snapshot_archive_interval_slots: Slot::MAX, snapshot_archives_dir, bank_snapshots_dir, @@ -740,6 +740,7 @@ fn load_bank_forks( vec![non_primary_accounts_path] }; + let (accounts_package_sender, _) = channel(); bank_forks_utils::load( genesis_config, blockstore, @@ -749,6 +750,7 @@ fn load_bank_forks( process_options, None, None, + accounts_package_sender, ) } @@ -1652,7 +1654,7 @@ fn main() { process_options, snapshot_archive_path, ) { - Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { + Ok((bank_forks, ..)) => { println!( "{}", compute_shred_version( @@ -1727,7 +1729,7 @@ fn main() { process_options, snapshot_archive_path, ) { - Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { + Ok((bank_forks, ..)) => { println!("{}", &bank_forks.working_bank().hash()); } Err(err) => { @@ -1908,7 +1910,7 @@ fn main() { AccessType::TryPrimaryThenSecondary, wal_recovery_mode, ); - let (bank_forks, _, _) = load_bank_forks( + let (bank_forks, ..) = load_bank_forks( arg_matches, &open_genesis_config_by(&ledger_path, arg_matches), &blockstore, @@ -1947,7 +1949,7 @@ fn main() { process_options, snapshot_archive_path, ) { - Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { + Ok((bank_forks, ..)) => { let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes")); let extension = Path::new(&output_file).extension(); @@ -2049,7 +2051,7 @@ fn main() { }, snapshot_archive_path, ) { - Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { + Ok((bank_forks, ..)) => { let mut bank = bank_forks .get(snapshot_slot) .unwrap_or_else(|| { @@ -2279,7 +2281,7 @@ fn main() { process_options, snapshot_archive_path, ) { - Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { + Ok((bank_forks, ..)) => { let slot = bank_forks.working_bank().slot(); let bank = bank_forks.get(slot).unwrap_or_else(|| { eprintln!("Error: Slot {} is not available", slot); @@ -2338,7 +2340,7 @@ fn main() { process_options, snapshot_archive_path, ) { - Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => { + Ok((bank_forks, ..)) => { let slot = bank_forks.working_bank().slot(); let bank = bank_forks.get(slot).unwrap_or_else(|| { eprintln!("Error: Slot {} is not available", slot); diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 68a428799d6059..7dceae42f79132 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -8,12 +8,20 @@ use crate::{ }; use log::*; use solana_entry::entry::VerifyRecyclers; -use solana_runtime::{bank_forks::BankForks, snapshot_config::SnapshotConfig, snapshot_utils}; +use solana_runtime::{ + bank_forks::BankForks, snapshot_archive_info::SnapshotArchiveInfoGetter, + snapshot_config::SnapshotConfig, snapshot_package::AccountsPackageSender, snapshot_utils, +}; use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash}; use std::{fs, path::PathBuf, process, result}; pub type LoadResult = result::Result< - (BankForks, LeaderScheduleCache, Option<(Slot, Hash)>), + ( + BankForks, + LeaderScheduleCache, + Option, + Option<(Slot, Hash)>, + ), BlockstoreProcessorError, >; @@ -21,9 +29,16 @@ fn to_loadresult( bpr: BlockstoreProcessorResult, snapshot_slot_and_hash: Option<(Slot, Hash)>, ) -> LoadResult { - bpr.map(|(bank_forks, leader_schedule_cache)| { - (bank_forks, leader_schedule_cache, snapshot_slot_and_hash) - }) + bpr.map( + |(bank_forks, leader_schedule_cache, last_full_snapshot_slot)| { + ( + bank_forks, + leader_schedule_cache, + last_full_snapshot_slot, + snapshot_slot_and_hash, + ) + }, + ) } /// Load the banks and accounts @@ -39,6 +54,7 @@ pub fn load( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + accounts_package_sender: AccountsPackageSender, ) -> LoadResult { if let Some(snapshot_config) = snapshot_config { info!( @@ -63,6 +79,7 @@ pub fn load( process_options, transaction_status_sender, cache_block_meta_sender, + accounts_package_sender, ); } else { info!("No snapshot package available; will load from genesis"); @@ -77,6 +94,8 @@ pub fn load( account_paths, process_options, cache_block_meta_sender, + snapshot_config, + accounts_package_sender, ) } @@ -86,6 +105,8 @@ fn load_from_genesis( account_paths: Vec, process_options: ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + snapshot_config: Option<&SnapshotConfig>, + accounts_package_sender: AccountsPackageSender, ) -> LoadResult { info!("Processing ledger from genesis"); to_loadresult( @@ -95,6 +116,8 @@ fn load_from_genesis( account_paths, process_options, cache_block_meta_sender, + snapshot_config, + accounts_package_sender, ), None, ) @@ -110,6 +133,7 @@ fn load_from_snapshot( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + accounts_package_sender: AccountsPackageSender, ) -> LoadResult { // Fail hard here if snapshot fails to load, don't silently continue if account_paths.is_empty() { @@ -117,24 +141,25 @@ fn load_from_snapshot( process::exit(1); } - let (deserialized_bank, timings) = snapshot_utils::bank_from_latest_snapshot_archives( - &snapshot_config.bank_snapshots_dir, - &snapshot_config.snapshot_archives_dir, - &account_paths, - &process_options.frozen_accounts, - genesis_config, - process_options.debug_keys.clone(), - Some(&crate::builtins::get(process_options.bpf_jit)), - process_options.account_indexes.clone(), - process_options.accounts_db_caching_enabled, - process_options.limit_load_slot_count_from_snapshot, - process_options.shrink_ratio, - process_options.accounts_db_test_hash_calculation, - process_options.accounts_db_skip_shrink, - process_options.verify_index, - process_options.accounts_index_config, - ) - .expect("Load from snapshot failed"); + let (deserialized_bank, timings, full_snapshot_archive_info, _) = + snapshot_utils::bank_from_latest_snapshot_archives( + &snapshot_config.bank_snapshots_dir, + &snapshot_config.snapshot_archives_dir, + &account_paths, + &process_options.frozen_accounts, + genesis_config, + process_options.debug_keys.clone(), + Some(&crate::builtins::get(process_options.bpf_jit)), + process_options.account_indexes.clone(), + process_options.accounts_db_caching_enabled, + process_options.limit_load_slot_count_from_snapshot, + process_options.shrink_ratio, + process_options.accounts_db_test_hash_calculation, + process_options.accounts_db_skip_shrink, + process_options.verify_index, + process_options.accounts_index_config, + ) + .expect("Load from snapshot failed"); let deserialized_bank_slot_and_hash = ( deserialized_bank.slot(), @@ -153,7 +178,10 @@ fn load_from_snapshot( &VerifyRecyclers::default(), transaction_status_sender, cache_block_meta_sender, + Some(snapshot_config), + accounts_package_sender, timings, + full_snapshot_archive_info.slot(), ), Some(deserialized_bank_slot_and_hash), ) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 4765121a8d7a2d..2f275fe95e7618 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -25,7 +25,9 @@ use solana_runtime::{ bank_forks::BankForks, bank_utils, commitment::VOTE_THRESHOLD_SIZE, - snapshot_utils::BankFromArchiveTimings, + snapshot_config::SnapshotConfig, + snapshot_package::{AccountsPackageSender, SnapshotType}, + snapshot_utils::{self, BankFromArchiveTimings}, transaction_batch::TransactionBatch, vote_account::VoteAccount, vote_sender_types::ReplayVoteSender, @@ -43,7 +45,6 @@ use solana_sdk::{ use solana_transaction_status::token_balances::{ collect_token_balances, TransactionTokenBalancesSet, }; - use std::{ cell::RefCell, collections::{HashMap, HashSet}, @@ -83,7 +84,7 @@ impl BlockCostCapacityMeter { } } -pub type BlockstoreProcessorInner = (BankForks, LeaderScheduleCache); +pub type BlockstoreProcessorInner = (BankForks, LeaderScheduleCache, Option); pub type BlockstoreProcessorResult = result::Result; @@ -480,6 +481,8 @@ pub fn process_blockstore( account_paths: Vec, opts: ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + snapshot_config: Option<&SnapshotConfig>, + accounts_package_sender: AccountsPackageSender, ) -> BlockstoreProcessorResult { if let Some(num_threads) = opts.override_num_threads { PAR_THREAD_POOL.with(|pool| { @@ -520,11 +523,15 @@ pub fn process_blockstore( &recyclers, None, cache_block_meta_sender, + snapshot_config, + accounts_package_sender, BankFromArchiveTimings::default(), + None, ) } -// Process blockstore from a known root bank +/// Process blockstore from a known root bank +#[allow(clippy::too_many_arguments)] pub(crate) fn process_blockstore_from_root( blockstore: &Blockstore, bank: Bank, @@ -532,7 +539,10 @@ pub(crate) fn process_blockstore_from_root( recyclers: &VerifyRecyclers, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + snapshot_config: Option<&SnapshotConfig>, + accounts_package_sender: AccountsPackageSender, timings: BankFromArchiveTimings, + last_full_snapshot_slot: Slot, ) -> BlockstoreProcessorResult { do_process_blockstore_from_root( blockstore, @@ -541,10 +551,14 @@ pub(crate) fn process_blockstore_from_root( recyclers, transaction_status_sender, cache_block_meta_sender, + snapshot_config, + accounts_package_sender, timings, + Some(last_full_snapshot_slot), ) } +#[allow(clippy::too_many_arguments)] fn do_process_blockstore_from_root( blockstore: &Blockstore, bank: Arc, @@ -552,7 +566,10 @@ fn do_process_blockstore_from_root( recyclers: &VerifyRecyclers, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + snapshot_config: Option<&SnapshotConfig>, + accounts_package_sender: AccountsPackageSender, timings: BankFromArchiveTimings, + mut last_full_snapshot_slot: Option, ) -> BlockstoreProcessorResult { info!("processing ledger from slot {}...", bank.slot()); @@ -614,7 +631,10 @@ fn do_process_blockstore_from_root( recyclers, transaction_status_sender, cache_block_meta_sender, + snapshot_config, + accounts_package_sender, &mut timing, + &mut last_full_snapshot_slot, )?; initial_forks.sort_by_key(|bank| bank.slot()); @@ -630,6 +650,7 @@ fn do_process_blockstore_from_root( if initial_forks.is_empty() { return Err(BlockstoreProcessorError::NoValidForksFound); } + let bank_forks = BankForks::new_from_banks(&initial_forks, root); let processing_time = now.elapsed(); @@ -697,7 +718,7 @@ fn do_process_blockstore_from_root( ); assert!(bank_forks.active_banks().is_empty()); - Ok((bank_forks, leader_schedule_cache)) + Ok((bank_forks, leader_schedule_cache, last_full_snapshot_slot)) } /// Verify that a segment of entries has the correct number of ticks and hashes @@ -1038,7 +1059,10 @@ fn load_frozen_forks( recyclers: &VerifyRecyclers, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + snapshot_config: Option<&SnapshotConfig>, + accounts_package_sender: AccountsPackageSender, timing: &mut ExecuteTimings, + last_full_snapshot_slot: &mut Option, ) -> result::Result>, BlockstoreProcessorError> { let mut initial_forks = HashMap::new(); let mut all_banks = HashMap::new(); @@ -1054,6 +1078,7 @@ fn load_frozen_forks( "load_frozen_forks() latest root from blockstore: {}, max_root: {}", blockstore_max_root, max_root, ); + process_next_slots( root_bank, root_meta, @@ -1161,11 +1186,35 @@ fn load_frozen_forks( leader_schedule_cache.set_root(new_root_bank); new_root_bank.squash(); + if let Some(snapshot_config) = snapshot_config { + let block_height = new_root_bank.block_height(); + if block_height % snapshot_config.full_snapshot_archive_interval_slots == 0 { + snapshot_utils::snapshot_bank( + new_root_bank, + new_root_bank.src.slot_deltas(&new_root_bank.src.roots()), + &accounts_package_sender, + &snapshot_config.bank_snapshots_dir, + &snapshot_config.snapshot_archives_dir, + snapshot_config.snapshot_version, + snapshot_config.archive_format, + None, + Some(SnapshotType::FullSnapshot), + ) + .expect("Failed to snapshot bank while loading frozen banks"); + trace!( + "took bank snapshot for new root bank, block height: {}, slot: {}", + block_height, + *root + ); + *last_full_snapshot_slot = Some(*root); + } + } + if last_free.elapsed() > Duration::from_secs(10) { // Must be called after `squash()`, so that AccountsDb knows what // the roots are for the cache flushing in exhaustively_free_unused_resource(). // This could take few secs; so update last_free later - new_root_bank.exhaustively_free_unused_resource(None); + new_root_bank.exhaustively_free_unused_resource(*last_full_snapshot_slot); last_free = Instant::now(); } @@ -1414,8 +1463,9 @@ pub mod tests { use matches::assert_matches; use rand::{thread_rng, Rng}; use solana_entry::entry::{create_ticks, next_entry, next_entry_mut}; - use solana_runtime::genesis_utils::{ - self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, + use solana_runtime::{ + genesis_utils::{self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs}, + snapshot_utils::{ArchiveFormat, SnapshotVersion}, }; use solana_sdk::{ account::{AccountSharedData, WritableAccount}, @@ -1432,7 +1482,11 @@ pub mod tests { vote_state::{VoteState, VoteStateVersions, MAX_LOCKOUT_HISTORY}, vote_transaction, }; - use std::{collections::BTreeSet, sync::RwLock}; + use std::{ + collections::BTreeSet, + sync::{mpsc::channel, RwLock}, + }; + use tempfile::TempDir; use trees::tr; fn test_process_blockstore( @@ -1440,7 +1494,17 @@ pub mod tests { blockstore: &Blockstore, opts: ProcessOptions, ) -> BlockstoreProcessorInner { - process_blockstore(genesis_config, blockstore, Vec::new(), opts, None).unwrap() + let (accounts_package_sender, _) = channel(); + process_blockstore( + genesis_config, + blockstore, + Vec::new(), + opts, + None, + None, + accounts_package_sender, + ) + .unwrap() } #[test] @@ -2228,7 +2292,7 @@ pub mod tests { accounts_db_test_hash_calculation: true, ..ProcessOptions::default() }; - let (_bank_forks, leader_schedule) = + let (_bank_forks, leader_schedule, _) = test_process_blockstore(&genesis_config, &blockstore, opts); assert_eq!(leader_schedule.max_schedules(), std::usize::MAX); } @@ -3005,14 +3069,18 @@ pub mod tests { bank1.squash(); // Test process_blockstore_from_root() from slot 1 onwards - let (bank_forks, _leader_schedule) = do_process_blockstore_from_root( + let (accounts_package_sender, _) = channel(); + let (bank_forks, ..) = do_process_blockstore_from_root( &blockstore, bank1, &opts, &recyclers, None, None, + None, + accounts_package_sender, BankFromArchiveTimings::default(), + None, ) .unwrap(); @@ -3034,6 +3102,120 @@ pub mod tests { verify_fork_infos(&bank_forks); } + /// Test that processing the blockstore is aware of incremental snapshots. When processing the + /// blockstore from a root, like what happens when loading from a snapshot, there may be new + /// roots that cross a full snapshot interval. In these cases, a bank snapshot must be taken, + /// so that a full snapshot archive is created and available by the time the background + /// services spin up. + /// + /// For this test, process enough roots to cross the full snapshot interval multiple times. + /// Ensure afterwards that the snapshots were created. + #[test] + fn test_process_blockstore_from_root_with_snapshots() { + solana_logger::setup(); + let GenesisConfigInfo { + mut genesis_config, .. + } = create_genesis_config(123); + + let ticks_per_slot = 1; + genesis_config.ticks_per_slot = ticks_per_slot; + let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + + const ROOT_INTERVAL_SLOTS: Slot = 2; + const FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = ROOT_INTERVAL_SLOTS * 5; + const LAST_SLOT: Slot = FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS * 4; + + let mut last_hash = blockhash; + for i in 1..=LAST_SLOT { + last_hash = + fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, i, i - 1, last_hash); + } + + let roots_to_set = (0..=LAST_SLOT) + .step_by(ROOT_INTERVAL_SLOTS as usize) + .collect_vec(); + blockstore.set_roots(roots_to_set.iter()).unwrap(); + + // Set up bank1 + let bank0 = Arc::new(Bank::new_for_tests(&genesis_config)); + let opts = ProcessOptions { + poh_verify: true, + accounts_db_test_hash_calculation: true, + ..ProcessOptions::default() + }; + let recyclers = VerifyRecyclers::default(); + process_bank_0(&bank0, &blockstore, &opts, &recyclers, None); + + let slot_start_processing = 1; + let bank = Arc::new(Bank::new_from_parent( + &bank0, + &Pubkey::default(), + slot_start_processing, + )); + confirm_full_slot( + &blockstore, + &bank, + &opts, + &recyclers, + &mut ConfirmationProgress::new(bank0.last_blockhash()), + None, + None, + &mut ExecuteTimings::default(), + ) + .unwrap(); + bank.squash(); + + let bank_snapshots_tempdir = TempDir::new().unwrap(); + let snapshot_config = SnapshotConfig { + full_snapshot_archive_interval_slots: FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS, + incremental_snapshot_archive_interval_slots: Slot::MAX, // value does not matter + snapshot_archives_dir: PathBuf::default(), // value does not matter + bank_snapshots_dir: bank_snapshots_tempdir.path().to_path_buf(), + archive_format: ArchiveFormat::TarZstd, // value does not matter + snapshot_version: SnapshotVersion::default(), // value does not matter + maximum_snapshots_to_retain: usize::MAX, // value does not matter + }; + + let (accounts_package_sender, accounts_package_receiver) = channel(); + + do_process_blockstore_from_root( + &blockstore, + bank, + &opts, + &recyclers, + None, + None, + Some(&snapshot_config), + accounts_package_sender.clone(), + BankFromArchiveTimings::default(), + None, + ) + .unwrap(); + + // The `drop()` is necessary here in order to call `.iter()` on the channel below + drop(accounts_package_sender); + + // Ensure all the AccountsPackages were created and sent to the AccountsPackageReceiver + let received_accounts_package_slots = accounts_package_receiver + .iter() + .map(|accounts_package| accounts_package.slot) + .collect::>(); + let expected_slots = (slot_start_processing..=LAST_SLOT) + .filter(|slot| slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0) + .collect::>(); + assert_eq!(received_accounts_package_slots, expected_slots); + + // Ensure all the bank snapshots were created + let bank_snapshots = snapshot_utils::get_bank_snapshots(&bank_snapshots_tempdir); + let mut bank_snapshot_slots = bank_snapshots + .into_iter() + .map(|bank_snapshot| bank_snapshot.slot) + .collect::>(); + bank_snapshot_slots.sort_unstable(); + assert_eq!(bank_snapshot_slots, expected_slots); + } + #[test] #[ignore] fn test_process_entries_stress() { diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index b6960b2badb397..849f463f9921ae 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -836,7 +836,12 @@ pub fn bank_from_latest_snapshot_archives( accounts_db_skip_shrink: bool, verify_index: bool, accounts_index_config: Option, -) -> Result<(Bank, BankFromArchiveTimings)> { +) -> Result<( + Bank, + BankFromArchiveTimings, + FullSnapshotArchiveInfo, + Option, +)> { let full_snapshot_archive_info = get_highest_full_snapshot_archive_info(&snapshot_archives_dir) .ok_or(SnapshotError::NoSnapshotArchives)?; @@ -888,7 +893,12 @@ pub fn bank_from_latest_snapshot_archives( ), )?; - Ok((bank, timings)) + Ok(( + bank, + timings, + full_snapshot_archive_info, + incremental_snapshot_archive_info, + )) } /// Check to make sure the deserialized bank's slot and hash matches the snapshot archive's slot @@ -2713,7 +2723,7 @@ mod tests { ) .unwrap(); - let (deserialized_bank, _) = bank_from_latest_snapshot_archives( + let (deserialized_bank, ..) = bank_from_latest_snapshot_archives( &bank_snapshots_dir, &snapshot_archives_dir, &[accounts_dir.as_ref().to_path_buf()],