Skip to content

Commit

Permalink
Revert "Make startup aware of Incremental Snapshots (#19550)" (#19599)
Browse files Browse the repository at this point in the history
This reverts commit d45ced0.
  • Loading branch information
brooksprumo authored Sep 3, 2021
1 parent d45ced0 commit e9374d3
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 337 deletions.
13 changes: 2 additions & 11 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,17 +572,8 @@ mod tests {
full_leader_cache: true,
..ProcessOptions::default()
};
let (accounts_package_sender, _) = channel();
let (bank_forks, cached_leader_schedule, _) = process_blockstore(
&genesis_config,
&blockstore,
Vec::new(),
opts,
None,
None,
accounts_package_sender,
)
.unwrap();
let (bank_forks, cached_leader_schedule) =
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap();
let leader_schedule_cache = Arc::new(cached_leader_schedule);
let bank_forks = Arc::new(RwLock::new(bank_forks));

Expand Down
42 changes: 19 additions & 23 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ use solana_runtime::{
bank_forks::BankForks,
commitment::BlockCommitmentCache,
snapshot_config::SnapshotConfig,
snapshot_package::{AccountsPackageReceiver, AccountsPackageSender, PendingSnapshotPackage},
snapshot_package::PendingSnapshotPackage,
vote_sender_types::ReplayVoteSender,
};
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair};
use solana_sdk::{pubkey::Pubkey, signature::Keypair};
use std::{
boxed::Box,
collections::HashSet,
Expand Down Expand Up @@ -135,8 +135,6 @@ impl Tvu {
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
cost_model: &Arc<RwLock<CostModel>>,
accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver),
last_full_snapshot_slot: Option<Slot>,
) -> Self {
let Sockets {
repair: repair_socket,
Expand Down Expand Up @@ -214,9 +212,9 @@ impl Tvu {
(Some(snapshot_config), Some(pending_snapshot_package))
})
.unwrap_or((None, None));
let (accounts_package_sender, accounts_package_receiver) = accounts_package_channel;
let (accounts_hash_sender, accounts_hash_receiver) = channel();
let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_package_receiver,
accounts_hash_receiver,
pending_snapshot_package,
exit,
cluster_info,
Expand All @@ -226,19 +224,20 @@ impl Tvu {
snapshot_config.clone(),
);

let (snapshot_request_sender, snapshot_request_handler) = match snapshot_config {
None => (None, None),
Some(snapshot_config) => {
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
(
Some(snapshot_request_sender),
Some(SnapshotRequestHandler {
snapshot_config,
snapshot_request_receiver,
accounts_package_sender,
}),
)
}
let (snapshot_request_sender, snapshot_request_handler) = {
snapshot_config
.map(|snapshot_config| {
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
(
Some(snapshot_request_sender),
Some(SnapshotRequestHandler {
snapshot_config,
snapshot_request_receiver,
accounts_package_sender: accounts_hash_sender,
}),
)
})
.unwrap_or((None, None))
};

let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
Expand Down Expand Up @@ -341,7 +340,7 @@ impl Tvu {
tvu_config.accounts_db_caching_enabled,
tvu_config.test_hash_calculation,
tvu_config.use_index_hash_calculation,
last_full_snapshot_slot,
None,
);

Tvu {
Expand Down Expand Up @@ -435,7 +434,6 @@ pub mod tests {
let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let tower = Tower::default();
let accounts_package_channel = channel();
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -479,8 +477,6 @@ pub mod tests {
TvuConfig::default(),
&Arc::new(MaxSlots::default()),
&Arc::new(RwLock::new(CostModel::default())),
accounts_package_channel,
None,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();
Expand Down
54 changes: 22 additions & 32 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use {
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_package::{AccountsPackageSender, PendingSnapshotPackage},
snapshot_package::PendingSnapshotPackage,
snapshot_utils,
},
solana_sdk::{
Expand All @@ -92,7 +92,7 @@ use {
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc::{channel, Receiver},
mpsc::Receiver,
Arc, Mutex, RwLock,
},
thread::{sleep, Builder, JoinHandle},
Expand Down Expand Up @@ -379,15 +379,14 @@ impl Validator {
.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
}

let accounts_package_channel = channel();
let (replay_vote_sender, replay_vote_receiver) = unbounded();
let (
genesis_config,
bank_forks,
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
last_full_snapshot_slot,
snapshot_hash,
TransactionHistoryServices {
transaction_status_sender,
Expand All @@ -409,7 +408,6 @@ impl Validator {
config.enforce_ulimit_nofile,
&start_progress,
config.no_poh_speed_test,
accounts_package_channel.0.clone(),
);

*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
Expand Down Expand Up @@ -709,7 +707,6 @@ impl Validator {
let rpc_completed_slots_service =
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());

let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
authorized_voter_keypairs,
Expand Down Expand Up @@ -780,8 +777,6 @@ impl Validator {
},
&max_slots,
&cost_model,
accounts_package_channel,
last_full_snapshot_slot,
);

let tpu = Tpu::new(
Expand Down Expand Up @@ -1074,7 +1069,7 @@ fn post_process_restored_tower(
})
}

#[allow(clippy::type_complexity, clippy::too_many_arguments)]
#[allow(clippy::type_complexity)]
fn new_banks_from_ledger(
validator_identity: &Pubkey,
vote_account: &Pubkey,
Expand All @@ -1085,15 +1080,13 @@ fn new_banks_from_ledger(
enforce_ulimit_nofile: bool,
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
no_poh_speed_test: bool,
accounts_package_sender: AccountsPackageSender,
) -> (
GenesisConfig,
BankForks,
Arc<Blockstore>,
Receiver<bool>,
CompletedSlotsReceiver,
LeaderScheduleCache,
Option<Slot>,
Option<(Slot, Hash)>,
TransactionHistoryServices,
Tower,
Expand Down Expand Up @@ -1189,26 +1182,24 @@ fn new_banks_from_ledger(
TransactionHistoryServices::default()
};

let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot, snapshot_hash) =
bank_forks_utils::load(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
transaction_history_services
.cache_block_meta_sender
.as_ref(),
accounts_package_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
let (mut bank_forks, mut leader_schedule_cache, snapshot_hash) = bank_forks_utils::load(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
transaction_history_services
.cache_block_meta_sender
.as_ref(),
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});

if let Some(warp_slot) = config.warp_slot {
let snapshot_config = config.snapshot_config.as_ref().unwrap_or_else(|| {
Expand Down Expand Up @@ -1286,7 +1277,6 @@ fn new_banks_from_ledger(
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
last_full_snapshot_slot,
snapshot_hash,
transaction_history_services,
tower,
Expand Down
4 changes: 2 additions & 2 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ mod tests {
accounts_dir: PathBuf,
genesis_config: &GenesisConfig,
) -> snapshot_utils::Result<()> {
let (deserialized_bank, ..) = snapshot_utils::bank_from_latest_snapshot_archives(
let (deserialized_bank, _) = snapshot_utils::bank_from_latest_snapshot_archives(
&snapshot_config.bank_snapshots_dir,
&snapshot_config.snapshot_archives_dir,
&[accounts_dir],
Expand Down Expand Up @@ -997,7 +997,7 @@ mod tests {
std::thread::sleep(Duration::from_secs(5));
info!("Awake! Rebuilding bank from latest snapshot archives...");

let (deserialized_bank, ..) = snapshot_utils::bank_from_latest_snapshot_archives(
let (deserialized_bank, _) = snapshot_utils::bank_from_latest_snapshot_archives(
&snapshot_test_config.snapshot_config.bank_snapshots_dir,
&snapshot_test_config.snapshot_config.snapshot_archives_dir,
&[snapshot_test_config.accounts_dir.as_ref().to_path_buf()],
Expand Down
20 changes: 9 additions & 11 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use std::{
path::{Path, PathBuf},
process::{exit, Command, Stdio},
str::FromStr,
sync::{mpsc::channel, Arc, RwLock},
sync::{Arc, RwLock},
};

mod bigtable;
Expand Down Expand Up @@ -712,7 +712,7 @@ fn load_bank_forks(
let snapshot_archives_dir =
snapshot_archive_path.unwrap_or_else(|| blockstore.ledger_path().to_path_buf());
Some(SnapshotConfig {
full_snapshot_archive_interval_slots: Slot::MAX,
full_snapshot_archive_interval_slots: 0, // Value doesn't matter
incremental_snapshot_archive_interval_slots: Slot::MAX,
snapshot_archives_dir,
bank_snapshots_dir,
Expand Down Expand Up @@ -740,7 +740,6 @@ fn load_bank_forks(
vec![non_primary_accounts_path]
};

let (accounts_package_sender, _) = channel();
bank_forks_utils::load(
genesis_config,
blockstore,
Expand All @@ -750,7 +749,6 @@ fn load_bank_forks(
process_options,
None,
None,
accounts_package_sender,
)
}

Expand Down Expand Up @@ -1654,7 +1652,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, ..)) => {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
println!(
"{}",
compute_shred_version(
Expand Down Expand Up @@ -1729,7 +1727,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, ..)) => {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
println!("{}", &bank_forks.working_bank().hash());
}
Err(err) => {
Expand Down Expand Up @@ -1910,7 +1908,7 @@ fn main() {
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
);
let (bank_forks, ..) = load_bank_forks(
let (bank_forks, _, _) = load_bank_forks(
arg_matches,
&open_genesis_config_by(&ledger_path, arg_matches),
&blockstore,
Expand Down Expand Up @@ -1949,7 +1947,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, ..)) => {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes"));

let extension = Path::new(&output_file).extension();
Expand Down Expand Up @@ -2051,7 +2049,7 @@ fn main() {
},
snapshot_archive_path,
) {
Ok((bank_forks, ..)) => {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let mut bank = bank_forks
.get(snapshot_slot)
.unwrap_or_else(|| {
Expand Down Expand Up @@ -2281,7 +2279,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, ..)) => {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
eprintln!("Error: Slot {} is not available", slot);
Expand Down Expand Up @@ -2340,7 +2338,7 @@ fn main() {
process_options,
snapshot_archive_path,
) {
Ok((bank_forks, ..)) => {
Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
eprintln!("Error: Slot {} is not available", slot);
Expand Down
Loading

0 comments on commit e9374d3

Please sign in to comment.