Skip to content

Commit

Permalink
Make background services aware of incremental snapshots
Browse files Browse the repository at this point in the history
AccountsBackgroundService now knows about incremental snapshots.  It is
now also in charge of deciding if an AccountsPackage is destined to be a
SnapshotPackage or not (or just used by AccountsHashVerifier).

!!! New behavior changes !!!

Full snapshots (both bank and archive) **MUST** succeed.  If there are
failures, the functions will be retried.  If they still fail, an assert
will be triggered.

This is required because of how the last full snapshot slot is
calculated, which is used by AccountsBackgroundService when calling
`clean_accounts()`.
  • Loading branch information
brooksprumo committed Aug 24, 2021
1 parent ceb3eae commit 9b5d5f9
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 411 deletions.
169 changes: 108 additions & 61 deletions core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@

use rayon::ThreadPool;
use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_measure::measure::Measure;
use solana_runtime::{
accounts_db,
snapshot_archive_info::SnapshotArchiveInfoGetter,
accounts_db::{self, AccountsDb},
accounts_hash::HashStats,
snapshot_config::SnapshotConfig,
snapshot_package::{
AccountsPackage, AccountsPackageReceiver, PendingSnapshotPackage, SnapshotPackage,
SnapshotType,
},
snapshot_utils,
sorted_storages::SortedStorages,
};
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
use std::collections::{HashMap, HashSet};
Expand All @@ -39,7 +41,7 @@ impl AccountsHashVerifier {
cluster_info: &Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>,
halt_on_trusted_validators_accounts_hash_mismatch: bool,
fault_injection_rate_slots: u64,
fault_injection_rate_slots: Slot,
snapshot_config: Option<SnapshotConfig>,
) -> Self {
let exit = exit.clone();
Expand All @@ -48,19 +50,17 @@ impl AccountsHashVerifier {
.name("solana-hash-accounts".to_string())
.spawn(move || {
let mut hashes = vec![];
let mut thread_pool_storage = None;
let mut thread_pool = None;
loop {
if exit.load(Ordering::Relaxed) {
break;
}

match accounts_package_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(accounts_package) => {
if accounts_package.hash_for_testing.is_some()
&& thread_pool_storage.is_none()
if accounts_package.hash_for_testing.is_some() && thread_pool.is_none()
{
thread_pool_storage =
Some(accounts_db::make_min_priority_thread_pool());
thread_pool = Some(accounts_db::make_min_priority_thread_pool());
}

Self::process_accounts_package(
Expand All @@ -73,7 +73,7 @@ impl AccountsHashVerifier {
&exit,
fault_injection_rate_slots,
snapshot_config.as_ref(),
thread_pool_storage.as_ref(),
thread_pool.as_ref(),
);
}
Err(RecvTimeoutError::Disconnected) => break,
Expand All @@ -96,49 +96,73 @@ impl AccountsHashVerifier {
pending_snapshot_package: Option<&PendingSnapshotPackage>,
hashes: &mut Vec<(Slot, Hash)>,
exit: &Arc<AtomicBool>,
fault_injection_rate_slots: u64,
fault_injection_rate_slots: Slot,
snapshot_config: Option<&SnapshotConfig>,
thread_pool: Option<&ThreadPool>,
) {
let snapshot_package =
snapshot_utils::process_accounts_package(accounts_package, thread_pool, None);
Self::process_snapshot_package(
snapshot_package,
Self::verify_accounts_package_hash(&accounts_package, thread_pool);

Self::push_accounts_hashes_to_cluster(
&accounts_package,
cluster_info,
trusted_validators,
halt_on_trusted_validator_accounts_hash_mismatch,
pending_snapshot_package,
hashes,
exit,
fault_injection_rate_slots,
snapshot_config,
);

Self::submit_for_packaging(accounts_package, pending_snapshot_package, snapshot_config);
}

fn process_snapshot_package(
snapshot_package: SnapshotPackage,
fn verify_accounts_package_hash(
accounts_package: &AccountsPackage,
thread_pool: Option<&ThreadPool>,
) {
let mut measure_hash = Measure::start("hash");
if let Some(expected_hash) = accounts_package.hash_for_testing {
let sorted_storages = SortedStorages::new(&accounts_package.snapshot_storages);
let (hash, lamports) = AccountsDb::calculate_accounts_hash_without_index(
&sorted_storages,
thread_pool,
HashStats::default(),
false,
None,
)
.unwrap();

assert_eq!(accounts_package.expected_capitalization, lamports);
assert_eq!(expected_hash, hash);
};
measure_hash.stop();
datapoint_info!(
"accounts_hash_verifier",
("calculate_hash", measure_hash.as_us(), i64),
);
}

fn push_accounts_hashes_to_cluster(
accounts_package: &AccountsPackage,
cluster_info: &ClusterInfo,
trusted_validators: Option<&HashSet<Pubkey>>,
halt_on_trusted_validator_accounts_hash_mismatch: bool,
pending_snapshot_package: Option<&PendingSnapshotPackage>,
hashes: &mut Vec<(Slot, Hash)>,
exit: &Arc<AtomicBool>,
fault_injection_rate_slots: u64,
snapshot_config: Option<&SnapshotConfig>,
fault_injection_rate_slots: Slot,
) {
let hash = *snapshot_package.hash();
let hash = accounts_package.hash;
if fault_injection_rate_slots != 0
&& snapshot_package.slot() % fault_injection_rate_slots == 0
&& accounts_package.slot % fault_injection_rate_slots == 0
{
// For testing, publish an invalid hash to gossip.
use rand::{thread_rng, Rng};
use solana_sdk::hash::extend_and_hash;
warn!("inserting fault at slot: {}", snapshot_package.slot());
warn!("inserting fault at slot: {}", accounts_package.slot);
let rand = thread_rng().gen_range(0, 10);
let hash = extend_and_hash(&hash, &[rand]);
hashes.push((snapshot_package.slot(), hash));
hashes.push((accounts_package.slot, hash));
} else {
hashes.push((snapshot_package.slot(), hash));
hashes.push((accounts_package.slot, hash));
}

while hashes.len() > MAX_SNAPSHOT_HASHES {
Expand All @@ -155,19 +179,46 @@ impl AccountsHashVerifier {
}
}

if let Some(snapshot_config) = snapshot_config {
if snapshot_package.block_height % snapshot_config.full_snapshot_archive_interval_slots
== 0
{
if let Some(pending_snapshot_package) = pending_snapshot_package {
*pending_snapshot_package.lock().unwrap() = Some(snapshot_package);
}
}
}

cluster_info.push_accounts_hashes(hashes.clone());
}

fn submit_for_packaging(
accounts_package: AccountsPackage,
pending_snapshot_package: Option<&PendingSnapshotPackage>,
snapshot_config: Option<&SnapshotConfig>,
) {
if accounts_package.snapshot_type.is_none()
|| pending_snapshot_package.is_none()
|| snapshot_config.is_none()
{
return;
};

let snapshot_package = SnapshotPackage::from(accounts_package);
let pending_snapshot_package = pending_snapshot_package.unwrap();
let _snapshot_config = snapshot_config.unwrap();

// If the snapshot package is an Incremental Snapshot, do not submit it if there's already
// a pending Full Snapshot.
let can_submit = match snapshot_package.snapshot_type {
SnapshotType::FullSnapshot => true,
SnapshotType::IncrementalSnapshot(_) => pending_snapshot_package
.lock()
.unwrap()
.as_ref()
.map_or(true, |snapshot_package| {
matches!(
snapshot_package.snapshot_type,
SnapshotType::IncrementalSnapshot(_)
)
}),
};

if can_submit {
*pending_snapshot_package.lock().unwrap() = Some(snapshot_package);
}
}

fn should_halt(
cluster_info: &ClusterInfo,
trusted_validators: Option<&HashSet<Pubkey>>,
Expand Down Expand Up @@ -225,10 +276,10 @@ mod tests {
use solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo};
use solana_runtime::{
snapshot_config::LastFullSnapshotSlot,
snapshot_package::SnapshotType,
snapshot_utils::{ArchiveFormat, SnapshotVersion},
};
use solana_sdk::{
genesis_config::ClusterType,
hash::hash,
signature::{Keypair, Signer},
};
Expand Down Expand Up @@ -301,30 +352,24 @@ mod tests {
last_full_snapshot_slot: LastFullSnapshotSlot::default(),
};
for i in 0..MAX_SNAPSHOT_HASHES + 1 {
let slot = full_snapshot_archive_interval_slots + i as u64;
let block_height = full_snapshot_archive_interval_slots + i as u64;
let slot_deltas = vec![];
let snapshot_links = TempDir::new().unwrap();
let storages = vec![];
let snapshot_archive_path = PathBuf::from(".");
let hash = hash(&[i as u8]);
let archive_format = ArchiveFormat::TarBzip2;
let snapshot_version = SnapshotVersion::default();
let snapshot_package = SnapshotPackage::new(
slot,
block_height,
slot_deltas,
snapshot_links,
storages,
snapshot_archive_path,
hash,
archive_format,
snapshot_version,
SnapshotType::FullSnapshot,
);
let accounts_package = AccountsPackage {
slot: full_snapshot_archive_interval_slots + i as u64,
block_height: full_snapshot_archive_interval_slots + i as u64,
slot_deltas: vec![],
snapshot_links: TempDir::new().unwrap(),
snapshot_storages: vec![],
hash: hash(&[i as u8]),
archive_format: ArchiveFormat::TarBzip2,
snapshot_version: SnapshotVersion::default(),
snapshot_archives_dir: PathBuf::default(),
expected_capitalization: 0,
hash_for_testing: None,
cluster_type: ClusterType::MainnetBeta,
snapshot_type: None,
};

AccountsHashVerifier::process_snapshot_package(
snapshot_package,
AccountsHashVerifier::process_accounts_package(
accounts_package,
&cluster_info,
Some(&trusted_validators),
false,
Expand All @@ -333,7 +378,9 @@ mod tests {
&exit,
0,
Some(&snapshot_config),
None,
);

// sleep for 1ms to create a newer timestmap for gossip entry
// otherwise the timestamp won't be newer.
std::thread::sleep(Duration::from_millis(1));
Expand Down
Loading

0 comments on commit 9b5d5f9

Please sign in to comment.