Skip to content

Commit

Permalink
Make startup aware of Incremental Snapshots
Browse files Browse the repository at this point in the history
Now that the background services are aware of incremental snapshots,
they need (1) the corect last full snapshot slot in order to clean
accounts correctly, and (2) all expected full snapshots to be available
when created an incremental snapshot based on them.

This commit fixes startup so both requirements from above are met.

At startup, the blockstore processor loads frozen banks.  Some of these
banks may be roots, and some of these roots may cross the full snapshot
interval.  If/when that happens, take a bank snapshot and queue the full
snapshot in the AccountsPackageSender.  And at the end of startup,
return the last full snapshot slot to pass into the background services.

Fixes solana-labs#19297
  • Loading branch information
brooksprumo committed Sep 1, 2021
1 parent 82a6bbe commit e808092
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 124 deletions.
13 changes: 11 additions & 2 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,17 @@ mod tests {
full_leader_cache: true,
..ProcessOptions::default()
};
let (bank_forks, cached_leader_schedule) =
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap();
let (accounts_package_sender, _) = channel();
let (bank_forks, cached_leader_schedule, _) = process_blockstore(
&genesis_config,
&blockstore,
Vec::new(),
opts,
None,
None,
accounts_package_sender,
)
.unwrap();
let leader_schedule_cache = Arc::new(cached_leader_schedule);
let bank_forks = Arc::new(RwLock::new(bank_forks));

Expand Down
42 changes: 23 additions & 19 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ use solana_runtime::{
bank_forks::BankForks,
commitment::BlockCommitmentCache,
snapshot_config::SnapshotConfig,
snapshot_package::PendingSnapshotPackage,
snapshot_package::{AccountsPackageReceiver, AccountsPackageSender, PendingSnapshotPackage},
vote_sender_types::ReplayVoteSender,
};
use solana_sdk::{pubkey::Pubkey, signature::Keypair};
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair};
use std::{
boxed::Box,
collections::HashSet,
Expand Down Expand Up @@ -135,6 +135,8 @@ impl Tvu {
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
cost_model: &Arc<RwLock<CostModel>>,
accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver),
last_full_snapshot_slot: Option<Slot>,
) -> Self {
let Sockets {
repair: repair_socket,
Expand Down Expand Up @@ -212,9 +214,9 @@ impl Tvu {
(Some(snapshot_config), Some(pending_snapshot_package))
})
.unwrap_or((None, None));
let (accounts_hash_sender, accounts_hash_receiver) = channel();
let (accounts_package_sender, accounts_package_receiver) = accounts_package_channel;
let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_hash_receiver,
accounts_package_receiver,
pending_snapshot_package,
exit,
cluster_info,
Expand All @@ -224,20 +226,19 @@ impl Tvu {
snapshot_config.clone(),
);

let (snapshot_request_sender, snapshot_request_handler) = {
snapshot_config
.map(|snapshot_config| {
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
(
Some(snapshot_request_sender),
Some(SnapshotRequestHandler {
snapshot_config,
snapshot_request_receiver,
accounts_package_sender: accounts_hash_sender,
}),
)
})
.unwrap_or((None, None))
let (snapshot_request_sender, snapshot_request_handler) = match snapshot_config {
None => (None, None),
Some(snapshot_config) => {
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
(
Some(snapshot_request_sender),
Some(SnapshotRequestHandler {
snapshot_config,
snapshot_request_receiver,
accounts_package_sender,
}),
)
}
};

let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
Expand Down Expand Up @@ -340,7 +341,7 @@ impl Tvu {
tvu_config.accounts_db_caching_enabled,
tvu_config.test_hash_calculation,
tvu_config.use_index_hash_calculation,
None,
last_full_snapshot_slot,
);

Tvu {
Expand Down Expand Up @@ -434,6 +435,7 @@ pub mod tests {
let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let tower = Tower::default();
let accounts_package_channel = channel();
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -477,6 +479,8 @@ pub mod tests {
TvuConfig::default(),
&Arc::new(MaxSlots::default()),
&Arc::new(RwLock::new(CostModel::default())),
accounts_package_channel,
None,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();
Expand Down
58 changes: 36 additions & 22 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use {
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_package::PendingSnapshotPackage,
snapshot_package::{AccountsPackageSender, PendingSnapshotPackage},
snapshot_utils,
},
solana_sdk::{
Expand All @@ -88,7 +88,7 @@ use {
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc::Receiver,
mpsc::{channel, Receiver},
Arc, Mutex, RwLock,
},
thread::{sleep, Builder, JoinHandle},
Expand Down Expand Up @@ -372,14 +372,15 @@ impl Validator {
.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
}

let (replay_vote_sender, replay_vote_receiver) = unbounded();
let accounts_package_channel = channel();
let (
genesis_config,
bank_forks,
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
last_full_snapshot_slot,
snapshot_hash,
TransactionHistoryServices {
transaction_status_sender,
Expand All @@ -401,6 +402,7 @@ impl Validator {
config.enforce_ulimit_nofile,
&start_progress,
config.no_poh_speed_test,
accounts_package_channel.0.clone(),
);

*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
Expand Down Expand Up @@ -690,6 +692,7 @@ impl Validator {
let rpc_completed_slots_service =
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());

let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
authorized_voter_keypairs,
Expand Down Expand Up @@ -760,6 +763,8 @@ impl Validator {
},
&max_slots,
&cost_model,
accounts_package_channel,
last_full_snapshot_slot,
);

let tpu = Tpu::new(
Expand Down Expand Up @@ -1044,7 +1049,7 @@ fn post_process_restored_tower(
})
}

#[allow(clippy::type_complexity)]
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
fn new_banks_from_ledger(
validator_identity: &Pubkey,
vote_account: &Pubkey,
Expand All @@ -1055,13 +1060,15 @@ fn new_banks_from_ledger(
enforce_ulimit_nofile: bool,
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
no_poh_speed_test: bool,
accounts_package_sender: AccountsPackageSender,
) -> (
GenesisConfig,
BankForks,
Arc<Blockstore>,
Receiver<bool>,
CompletedSlotsReceiver,
LeaderScheduleCache,
Option<Slot>,
Option<(Slot, Hash)>,
TransactionHistoryServices,
Tower,
Expand Down Expand Up @@ -1157,24 +1164,26 @@ fn new_banks_from_ledger(
TransactionHistoryServices::default()
};

let (mut bank_forks, mut leader_schedule_cache, snapshot_hash) = bank_forks_utils::load(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
transaction_history_services
.cache_block_meta_sender
.as_ref(),
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot, snapshot_hash) =
bank_forks_utils::load(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
transaction_history_services
.cache_block_meta_sender
.as_ref(),
accounts_package_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});

if let Some(warp_slot) = config.warp_slot {
let snapshot_config = config.snapshot_config.as_ref().unwrap_or_else(|| {
Expand Down Expand Up @@ -1252,6 +1261,7 @@ fn new_banks_from_ledger(
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
last_full_snapshot_slot,
snapshot_hash,
transaction_history_services,
tower,
Expand Down Expand Up @@ -1872,4 +1882,8 @@ mod tests {
};
check_poh_speed(&genesis_config, Some(10_000));
}

// bprumo TODO: need to write a test that brings up a validator from a snapshot, processes
// block store that crosses a full snapshot interval, and then ensure that
// AccountsBackgroundService gets the right value for last_full_snapshot_slot
}
4 changes: 2 additions & 2 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ mod tests {
accounts_dir: PathBuf,
genesis_config: &GenesisConfig,
) -> snapshot_utils::Result<()> {
let (deserialized_bank, _) = snapshot_utils::bank_from_latest_snapshot_archives(
let (deserialized_bank, ..) = snapshot_utils::bank_from_latest_snapshot_archives(
&snapshot_config.bank_snapshots_dir,
&snapshot_config.snapshot_archives_dir,
&[accounts_dir],
Expand Down Expand Up @@ -997,7 +997,7 @@ mod tests {
std::thread::sleep(Duration::from_secs(5));
info!("Awake! Rebuilding bank from latest snapshot archives...");

let (deserialized_bank, _) = snapshot_utils::bank_from_latest_snapshot_archives(
let (deserialized_bank, ..) = snapshot_utils::bank_from_latest_snapshot_archives(
&snapshot_test_config.snapshot_config.bank_snapshots_dir,
&snapshot_test_config.snapshot_config.snapshot_archives_dir,
&[snapshot_test_config.accounts_dir.as_ref().to_path_buf()],
Expand Down
20 changes: 11 additions & 9 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use std::{
path::{Path, PathBuf},
process::{exit, Command, Stdio},
str::FromStr,
sync::{Arc, RwLock},
sync::{mpsc::channel, Arc, RwLock},
};

mod bigtable;
Expand Down Expand Up @@ -712,7 +712,7 @@ fn load_bank_forks(
let snapshot_archives_dir =
snapshot_archive_path.unwrap_or_else(|| blockstore.ledger_path().to_path_buf());
Some(SnapshotConfig {
full_snapshot_archive_interval_slots: 0, // Value doesn't matter
full_snapshot_archive_interval_slots: Slot::MAX,
incremental_snapshot_archive_interval_slots: Slot::MAX,
snapshot_archives_dir,
bank_snapshots_dir,
Expand Down Expand Up @@ -740,6 +740,7 @@ fn load_bank_forks(
vec![non_primary_accounts_path]
};

let (accounts_package_sender, _) = channel();
bank_forks_utils::load(
genesis_config,
blockstore,
Expand All @@ -749,6 +750,7 @@ fn load_bank_forks(
process_options,
None,
None,
accounts_package_sender,
)
}

Expand Down Expand Up @@ -1652,7 +1654,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
Ok((bank_forks, ..)) => {
println!(
"{}",
compute_shred_version(
Expand Down Expand Up @@ -1727,7 +1729,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
Ok((bank_forks, ..)) => {
println!("{}", &bank_forks.working_bank().hash());
}
Err(err) => {
Expand Down Expand Up @@ -1908,7 +1910,7 @@ fn main() {
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
);
let (bank_forks, _, _) = load_bank_forks(
let (bank_forks, ..) = load_bank_forks(
arg_matches,
&open_genesis_config_by(&ledger_path, arg_matches),
&blockstore,
Expand Down Expand Up @@ -1947,7 +1949,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
Ok((bank_forks, ..)) => {
let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes"));

let extension = Path::new(&output_file).extension();
Expand Down Expand Up @@ -2049,7 +2051,7 @@ fn main() {
},
snapshot_archive_path,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
Ok((bank_forks, ..)) => {
let mut bank = bank_forks
.get(snapshot_slot)
.unwrap_or_else(|| {
Expand Down Expand Up @@ -2279,7 +2281,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
Ok((bank_forks, ..)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
eprintln!("Error: Slot {} is not available", slot);
Expand Down Expand Up @@ -2338,7 +2340,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
Ok((bank_forks, ..)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
eprintln!("Error: Slot {} is not available", slot);
Expand Down
Loading

0 comments on commit e808092

Please sign in to comment.