Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make background services aware of incremental snapshots #19401

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 102 additions & 58 deletions core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@

use rayon::ThreadPool;
use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_measure::measure::Measure;
use solana_runtime::{
accounts_db,
snapshot_archive_info::SnapshotArchiveInfoGetter,
accounts_db::{self, AccountsDb},
accounts_hash::HashStats,
snapshot_config::SnapshotConfig,
snapshot_package::{
AccountsPackage, AccountsPackageReceiver, PendingSnapshotPackage, SnapshotPackage,
SnapshotType,
},
snapshot_utils,
sorted_storages::SortedStorages,
};
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -48,19 +50,17 @@ impl AccountsHashVerifier {
.name("solana-hash-accounts".to_string())
.spawn(move || {
let mut hashes = vec![];
let mut thread_pool_storage = None;
let mut thread_pool = None;
loop {
if exit.load(Ordering::Relaxed) {
break;
}

match accounts_package_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(accounts_package) => {
if accounts_package.hash_for_testing.is_some()
&& thread_pool_storage.is_none()
if accounts_package.hash_for_testing.is_some() && thread_pool.is_none()
{
thread_pool_storage =
Some(accounts_db::make_min_priority_thread_pool());
thread_pool = Some(accounts_db::make_min_priority_thread_pool());
}

Self::process_accounts_package(
Expand All @@ -73,7 +73,7 @@ impl AccountsHashVerifier {
&exit,
fault_injection_rate_slots,
snapshot_config.as_ref(),
thread_pool_storage.as_ref(),
thread_pool.as_ref(),
);
}
Err(RecvTimeoutError::Disconnected) => break,
Expand All @@ -100,45 +100,69 @@ impl AccountsHashVerifier {
snapshot_config: Option<&SnapshotConfig>,
thread_pool: Option<&ThreadPool>,
) {
let snapshot_package =
snapshot_utils::process_accounts_package(accounts_package, thread_pool, None);
Self::process_snapshot_package(
snapshot_package,
Self::verify_accounts_package_hash(&accounts_package, thread_pool);

Self::push_accounts_hashes_to_cluster(
&accounts_package,
cluster_info,
trusted_validators,
halt_on_trusted_validator_accounts_hash_mismatch,
pending_snapshot_package,
hashes,
exit,
fault_injection_rate_slots,
snapshot_config,
);

Self::submit_for_packaging(accounts_package, pending_snapshot_package, snapshot_config);
}

fn process_snapshot_package(
snapshot_package: SnapshotPackage,
fn verify_accounts_package_hash(
accounts_package: &AccountsPackage,
thread_pool: Option<&ThreadPool>,
) {
let mut measure_hash = Measure::start("hash");
if let Some(expected_hash) = accounts_package.hash_for_testing {
let sorted_storages = SortedStorages::new(&accounts_package.snapshot_storages);
let (hash, lamports) = AccountsDb::calculate_accounts_hash_without_index(
&sorted_storages,
thread_pool,
HashStats::default(),
false,
None,
)
.unwrap();

assert_eq!(accounts_package.expected_capitalization, lamports);
assert_eq!(expected_hash, hash);
};
measure_hash.stop();
Comment on lines +123 to +137
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be in snapshot_utils::process_accounts_package(), but nothing used it except for AccountsHashVerifier. So, moving it here. As a plus, now we don't need to pass a TheadPool into other snapshot_utils functions.

datapoint_info!(
"accounts_hash_verifier",
("calculate_hash", measure_hash.as_us(), i64),
);
}

fn push_accounts_hashes_to_cluster(
accounts_package: &AccountsPackage,
cluster_info: &ClusterInfo,
trusted_validators: Option<&HashSet<Pubkey>>,
halt_on_trusted_validator_accounts_hash_mismatch: bool,
pending_snapshot_package: Option<&PendingSnapshotPackage>,
hashes: &mut Vec<(Slot, Hash)>,
exit: &Arc<AtomicBool>,
fault_injection_rate_slots: u64,
snapshot_config: Option<&SnapshotConfig>,
) {
let hash = *snapshot_package.hash();
let hash = accounts_package.hash;
if fault_injection_rate_slots != 0
&& snapshot_package.slot() % fault_injection_rate_slots == 0
&& accounts_package.slot % fault_injection_rate_slots == 0
{
// For testing, publish an invalid hash to gossip.
use rand::{thread_rng, Rng};
use solana_sdk::hash::extend_and_hash;
warn!("inserting fault at slot: {}", snapshot_package.slot());
warn!("inserting fault at slot: {}", accounts_package.slot);
let rand = thread_rng().gen_range(0, 10);
let hash = extend_and_hash(&hash, &[rand]);
hashes.push((snapshot_package.slot(), hash));
hashes.push((accounts_package.slot, hash));
} else {
hashes.push((snapshot_package.slot(), hash));
hashes.push((accounts_package.slot, hash));
}

while hashes.len() > MAX_SNAPSHOT_HASHES {
Expand All @@ -155,19 +179,43 @@ impl AccountsHashVerifier {
}
}

if let Some(snapshot_config) = snapshot_config {
if snapshot_package.block_height % snapshot_config.full_snapshot_archive_interval_slots
== 0
{
if let Some(pending_snapshot_package) = pending_snapshot_package {
*pending_snapshot_package.lock().unwrap() = Some(snapshot_package);
}
}
}

cluster_info.push_accounts_hashes(hashes.clone());
}

fn submit_for_packaging(
accounts_package: AccountsPackage,
pending_snapshot_package: Option<&PendingSnapshotPackage>,
snapshot_config: Option<&SnapshotConfig>,
) {
if accounts_package.snapshot_type.is_none()
|| pending_snapshot_package.is_none()
|| snapshot_config.is_none()
{
return;
};

let snapshot_package = SnapshotPackage::from(accounts_package);
let pending_snapshot_package = pending_snapshot_package.unwrap();
let _snapshot_config = snapshot_config.unwrap();
Copy link
Contributor Author

@brooksprumo brooksprumo Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be able to remove SnapshotConfig from AccountsHashVerifier entirely too. I'm thinking about taking some things out of AccountsPackage that live in SnapshotConfig, which I may then use here, so I'm going to leave the SnapshotConfig here for now.


// If the snapshot package is an Incremental Snapshot, do not submit it if there's already
// a pending Full Snapshot.
let can_submit = match snapshot_package.snapshot_type {
SnapshotType::FullSnapshot => true,
SnapshotType::IncrementalSnapshot(_) => pending_snapshot_package
.lock()
.unwrap()
.as_ref()
.map_or(true, |snapshot_package| {
snapshot_package.snapshot_type.is_incremental_snapshot()
}),
};

if can_submit {
*pending_snapshot_package.lock().unwrap() = Some(snapshot_package);
}
}

fn should_halt(
cluster_info: &ClusterInfo,
trusted_validators: Option<&HashSet<Pubkey>>,
Expand Down Expand Up @@ -225,10 +273,10 @@ mod tests {
use solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo};
use solana_runtime::{
snapshot_config::LastFullSnapshotSlot,
snapshot_package::SnapshotType,
snapshot_utils::{ArchiveFormat, SnapshotVersion},
};
use solana_sdk::{
genesis_config::ClusterType,
hash::hash,
signature::{Keypair, Signer},
};
Expand Down Expand Up @@ -301,30 +349,24 @@ mod tests {
last_full_snapshot_slot: LastFullSnapshotSlot::default(),
};
for i in 0..MAX_SNAPSHOT_HASHES + 1 {
let slot = full_snapshot_archive_interval_slots + i as u64;
let block_height = full_snapshot_archive_interval_slots + i as u64;
let slot_deltas = vec![];
let snapshot_links = TempDir::new().unwrap();
let storages = vec![];
let snapshot_archive_path = PathBuf::from(".");
let hash = hash(&[i as u8]);
let archive_format = ArchiveFormat::TarBzip2;
let snapshot_version = SnapshotVersion::default();
let snapshot_package = SnapshotPackage::new(
slot,
block_height,
slot_deltas,
snapshot_links,
storages,
snapshot_archive_path,
hash,
archive_format,
snapshot_version,
SnapshotType::FullSnapshot,
);
let accounts_package = AccountsPackage {
slot: full_snapshot_archive_interval_slots + i as u64,
block_height: full_snapshot_archive_interval_slots + i as u64,
slot_deltas: vec![],
snapshot_links: TempDir::new().unwrap(),
snapshot_storages: vec![],
hash: hash(&[i as u8]),
archive_format: ArchiveFormat::TarBzip2,
snapshot_version: SnapshotVersion::default(),
snapshot_archives_dir: PathBuf::default(),
expected_capitalization: 0,
hash_for_testing: None,
cluster_type: ClusterType::MainnetBeta,
snapshot_type: None,
};

AccountsHashVerifier::process_snapshot_package(
snapshot_package,
AccountsHashVerifier::process_accounts_package(
accounts_package,
&cluster_info,
Some(&trusted_validators),
false,
Expand All @@ -333,7 +375,9 @@ mod tests {
&exit,
0,
Some(&snapshot_config),
None,
);

// sleep for 1ms to create a newer timestmap for gossip entry
// otherwise the timestamp won't be newer.
std::thread::sleep(Duration::from_millis(1));
Expand Down
73 changes: 40 additions & 33 deletions core/src/snapshot_packager_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,26 @@ impl SnapshotPackagerService {
}

let snapshot_package = pending_snapshot_package.lock().unwrap().take();
if let Some(snapshot_package) = snapshot_package {
match snapshot_utils::archive_snapshot_package(
&snapshot_package,
maximum_snapshots_to_retain,
) {
Ok(_) => {
hashes.push((snapshot_package.slot(), *snapshot_package.hash()));
while hashes.len() > MAX_SNAPSHOT_HASHES {
hashes.remove(0);
}
cluster_info.push_snapshot_hashes(hashes.clone());
}
Err(err) => {
warn!("Failed to create snapshot archive: {}", err);
}
};
} else {
if snapshot_package.is_none() {
std::thread::sleep(Duration::from_millis(100));
continue;
}
let snapshot_package = snapshot_package.unwrap();

// Archiving the snapshot package is not allowed to fail.
// AccountsBackgroundService calls `clean_accounts()` with a value for
// last_full_snapshot_slot that requires this archive call to succeed.
snapshot_utils::archive_snapshot_package(
&snapshot_package,
maximum_snapshots_to_retain,
)
.expect("failed to archive snapshot package");

hashes.push((snapshot_package.slot(), *snapshot_package.hash()));
while hashes.len() > MAX_SNAPSHOT_HASHES {
hashes.remove(0);
}
cluster_info.push_snapshot_hashes(hashes.clone());
}
})
.unwrap();
Expand All @@ -82,6 +83,7 @@ mod tests {
use solana_runtime::{
accounts_db::AccountStorageEntry,
bank::BankSlotDelta,
snapshot_archive_info::SnapshotArchiveInfo,
snapshot_package::{SnapshotPackage, SnapshotType},
snapshot_utils::{self, ArchiveFormat, SnapshotVersion, SNAPSHOT_STATUS_CACHE_FILE_NAME},
};
Expand Down Expand Up @@ -160,24 +162,29 @@ mod tests {
}

// Create a packageable snapshot
let slot = 42;
let hash = Hash::default();
let archive_format = ArchiveFormat::TarBzip2;
let output_tar_path = snapshot_utils::build_full_snapshot_archive_path(
snapshot_archives_dir,
42,
&Hash::default(),
ArchiveFormat::TarBzip2,
);
let snapshot_package = SnapshotPackage::new(
5,
5,
vec![],
link_snapshots_dir,
vec![storage_entries],
output_tar_path.clone(),
Hash::default(),
ArchiveFormat::TarBzip2,
SnapshotVersion::default(),
SnapshotType::FullSnapshot,
slot,
&hash,
archive_format,
);
let snapshot_package = SnapshotPackage {
snapshot_archive_info: SnapshotArchiveInfo {
path: output_tar_path.clone(),
slot,
hash,
archive_format,
},
block_height: slot,
slot_deltas: vec![],
snapshot_links: link_snapshots_dir,
snapshot_storages: vec![storage_entries],
snapshot_version: SnapshotVersion::default(),
snapshot_type: SnapshotType::FullSnapshot,
};

// Make tarball from packageable snapshot
snapshot_utils::archive_snapshot_package(
Expand All @@ -204,7 +211,7 @@ mod tests {
output_tar_path,
snapshots_dir,
accounts_dir,
ArchiveFormat::TarBzip2,
archive_format,
);
}
}
1 change: 1 addition & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ impl Tvu {
tvu_config.accounts_db_caching_enabled,
tvu_config.test_hash_calculation,
tvu_config.use_index_hash_calculation,
None,
);

Tvu {
Expand Down
1 change: 0 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,6 @@ fn new_banks_from_ledger(
None,
&snapshot_config.snapshot_archives_dir,
snapshot_config.archive_format,
Some(bank_forks.root_bank().get_thread_pool()),
snapshot_config.maximum_snapshots_to_retain,
)
.unwrap_or_else(|err| {
Expand Down
Loading