Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Make solana-ledger-tool run AccountsBackgroundService
Browse files Browse the repository at this point in the history
Prior to this change, long running commands like `solana-ledger-tool
verify` would OOM due to AccountsDb cleanup not happening.

Co-authored-by: Michael Vines <[email protected]>
  • Loading branch information
mvines authored and Steven Czabaniuk committed Aug 4, 2022
1 parent 457f9ef commit 0504eb9
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 29 deletions.
21 changes: 2 additions & 19 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ use {
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;

/// maximum drop bank signal queue length
const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;

pub struct ValidatorConfig {
pub halt_at_slot: Option<Slot>,
pub expected_genesis_hash: Option<Hash>,
Expand Down Expand Up @@ -1468,22 +1465,8 @@ fn load_blockstore(
// drop behavior can be safely synchronized with any other ongoing accounts activity like
// cache flush, clean, shrink, as long as the same thread performing those activities also
// is processing the dropped banks from the `pruned_banks_receiver` channel.

// There should only be one bank, the root bank in BankForks. Thus all banks added to
// BankForks from now on will be descended from the root bank and thus will inherit
// the bank drop callback.
assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
let (pruned_banks_sender, pruned_banks_receiver) = bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
{
let root_bank = bank_forks.read().unwrap().root_bank();
root_bank.set_callback(Some(Box::new(
root_bank
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender),
)));
}
let pruned_banks_receiver =
AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());

{
let hard_forks: Vec<_> = bank_forks
Expand Down
52 changes: 43 additions & 9 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ use {
AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions,
ShredStorageType,
},
blockstore_processor::{BlockstoreProcessorError, ProcessOptions},
blockstore_processor::{self, BlockstoreProcessorError, ProcessOptions},
shred::Shred,
},
solana_measure::{measure, measure::Measure},
solana_runtime::{
accounts_background_service::{
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService,
},
accounts_db::{AccountsDbConfig, FillerAccountsConfig},
accounts_index::{AccountsIndexConfig, IndexLimitMb, ScanConfig},
bank::{Bank, RewardCalculationEvent},
Expand Down Expand Up @@ -923,18 +926,49 @@ fn load_bank_forks(
vec![non_primary_accounts_path]
};

bank_forks_utils::load(
genesis_config,
blockstore,
account_paths,
None,
snapshot_config.as_ref(),
process_options,
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) =
bank_forks_utils::load_bank_forks(
genesis_config,
blockstore,
account_paths,
None,
snapshot_config.as_ref(),
&process_options,
None,
None,
);

let pruned_banks_receiver =
AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
let abs_request_handler = AbsRequestHandler {
snapshot_request_handler: None,
pruned_banks_receiver,
};
let exit = Arc::new(AtomicBool::new(false));
let accounts_background_service = AccountsBackgroundService::new(
bank_forks.clone(),
&exit,
abs_request_handler,
process_options.accounts_db_caching_enabled,
process_options.accounts_db_test_hash_calculation,
None,
);

let result = blockstore_processor::process_blockstore_from_root(
blockstore,
&bank_forks,
&leader_schedule_cache,
&process_options,
None,
None,
&AbsRequestSender::default(),
)
.map(|(bank_forks, .., starting_snapshot_hashes)| (bank_forks, starting_snapshot_hashes))
.map(|_| (bank_forks, starting_snapshot_hashes));

exit.store(true, Ordering::Relaxed);
accounts_background_service.join().unwrap();

result
}

fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String> {
Expand Down
3 changes: 2 additions & 1 deletion ledger/src/bank_forks_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
},
log::*,
solana_runtime::{
accounts_background_service::AbsRequestSender,
accounts_update_notifier_interface::AccountsUpdateNotifier,
bank_forks::BankForks,
snapshot_archive_info::SnapshotArchiveInfoGetter,
Expand Down Expand Up @@ -68,7 +69,7 @@ pub fn load(
&process_options,
transaction_status_sender,
cache_block_meta_sender,
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
&AbsRequestSender::default(),
)
.map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes))
}
Expand Down
24 changes: 24 additions & 0 deletions runtime/src/accounts_background_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;

/// interval to report bank_drop queue events: 60s
const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
/// maximum drop bank signal queue length
const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;

/// Bank drop signal queue events
#[allow(dead_code)]
Expand Down Expand Up @@ -573,6 +575,28 @@ impl AccountsBackgroundService {
Self { t_background }
}

/// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
/// should only be one bank, the root bank, in `bank_forks`
/// All banks added to `bank_forks` will be descended from the root bank, and thus will inherit
/// the bank drop callback.
pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
assert_eq!(bank_forks.read().unwrap().banks().len(), 1);

let (pruned_banks_sender, pruned_banks_receiver) =
crossbeam_channel::bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
{
let root_bank = bank_forks.read().unwrap().root_bank();
root_bank.set_callback(Some(Box::new(
root_bank
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender),
)));
}
pruned_banks_receiver
}

pub fn join(self) -> thread::Result<()> {
self.t_background.join()
}
Expand Down

0 comments on commit 0504eb9

Please sign in to comment.