Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

account_saver: Remove nested options #2724

Merged
merged 6 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions accounts-db/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,10 +588,10 @@ impl Accounts {
pub fn store_cached<'a>(
&self,
accounts: impl StorableAccounts<'a>,
transactions: &'a [Option<&'a SanitizedTransaction>],
transactions: Option<&'a [&'a SanitizedTransaction]>,
) {
self.accounts_db
.store_cached_inline_update_index(accounts, Some(transactions));
.store_cached_inline_update_index(accounts, transactions);
}

pub fn store_accounts_cached<'a>(&self, accounts: impl StorableAccounts<'a>) {
Expand Down
60 changes: 22 additions & 38 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ use {
hash::Hash,
pubkey::Pubkey,
rent_collector::RentCollector,
saturating_add_assign,
timing::AtomicInterval,
transaction::SanitizedTransaction,
},
Expand Down Expand Up @@ -6809,38 +6810,32 @@ impl AccountsDb {
&self,
slot: Slot,
accounts_and_meta_to_store: &impl StorableAccounts<'b>,
txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>> + 'a>,
txs: Option<&[&SanitizedTransaction]>,
) -> Vec<AccountInfo> {
let mut write_version_producer: Box<dyn Iterator<Item = u64>> =
if self.accounts_update_notifier.is_some() {
let mut current_version = self
.write_version
.fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel);
Box::new(std::iter::from_fn(move || {
let ret = current_version;
current_version += 1;
Some(ret)
}))
} else {
Box::new(std::iter::empty())
};
let mut current_write_version = if self.accounts_update_notifier.is_some() {
self.write_version
.fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel)
} else {
0
};

let (account_infos, cached_accounts) = txn_iter
.enumerate()
.map(|(i, txn)| {
let (account_infos, cached_accounts) = (0..accounts_and_meta_to_store.len())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the txn_iterator is empty, we still want to iterate over the accounts to store.

Might make sense to just have some sort of iteration over accounts_and_meta_to_store?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds nice to me

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm sounded good in theory, but when I just took a stab it I ran into a problem immediately.
We implement StorableAccounts on various tuple types. Rust does not allow us to implement Iterator for arbitrary types that we didn't define.

So afaict we'd need to wrap these in some type or have an iter fn - which doesn't seem much better imo.

.map(|index| {
let txn = txs.map(|txs| *txs.get(index).expect("txs must be present if provided"));
let mut account_info = AccountInfo::default();
accounts_and_meta_to_store.account_default_if_zero_lamport(i, |account| {
accounts_and_meta_to_store.account_default_if_zero_lamport(index, |account| {
let account_shared_data = account.to_account_shared_data();
let pubkey = account.pubkey();
account_info = AccountInfo::new(StorageLocation::Cached, account.lamports());

self.notify_account_at_accounts_update(
slot,
&account_shared_data,
txn,
&txn,
pubkey,
&mut write_version_producer,
current_write_version,
);
saturating_add_assign!(current_write_version, 1);

let cached_account =
self.accounts_cache.store(slot, pubkey, account_shared_data);
Expand All @@ -6864,7 +6859,7 @@ impl AccountsDb {
&self,
accounts: &'c impl StorableAccounts<'b>,
store_to: &StoreTo,
transactions: Option<&[Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
) -> Vec<AccountInfo> {
let mut calc_stored_meta_time = Measure::start("calc_stored_meta");
let slot = accounts.target_slot();
Expand All @@ -6887,18 +6882,7 @@ impl AccountsDb {
.fetch_add(calc_stored_meta_time.as_us(), Ordering::Relaxed);

match store_to {
StoreTo::Cache => {
let txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>>> =
match transactions {
Some(transactions) => {
assert_eq!(transactions.len(), accounts.len());
Box::new(transactions.iter())
}
None => Box::new(std::iter::repeat(&None).take(accounts.len())),
};

self.write_accounts_to_cache(slot, accounts, txn_iter)
}
StoreTo::Cache => self.write_accounts_to_cache(slot, accounts, transactions),
StoreTo::Storage(storage) => self.write_accounts_to_storage(slot, storage, accounts),
}
}
Expand Down Expand Up @@ -8279,7 +8263,7 @@ impl AccountsDb {
pub fn store_cached<'a>(
&self,
accounts: impl StorableAccounts<'a>,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
) {
self.store(
accounts,
Expand All @@ -8293,7 +8277,7 @@ impl AccountsDb {
pub(crate) fn store_cached_inline_update_index<'a>(
&self,
accounts: impl StorableAccounts<'a>,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
) {
self.store(
accounts,
Expand Down Expand Up @@ -8321,7 +8305,7 @@ impl AccountsDb {
&self,
accounts: impl StorableAccounts<'a>,
store_to: &StoreTo,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) {
Expand Down Expand Up @@ -8510,7 +8494,7 @@ impl AccountsDb {
&self,
accounts: impl StorableAccounts<'a>,
store_to: &StoreTo,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) {
Expand Down Expand Up @@ -8556,7 +8540,7 @@ impl AccountsDb {
accounts: impl StorableAccounts<'a>,
store_to: &StoreTo,
reset_accounts: bool,
transactions: Option<&[Option<&SanitizedTransaction>]>,
transactions: Option<&'a [&'a SanitizedTransaction]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) -> StoreAccountsTiming {
Expand Down
10 changes: 4 additions & 6 deletions accounts-db/src/accounts_db/geyser_plugin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,21 @@ impl AccountsDb {
notify_stats.report();
}

pub fn notify_account_at_accounts_update<P>(
pub fn notify_account_at_accounts_update(
&self,
slot: Slot,
account: &AccountSharedData,
txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey,
write_version_producer: &mut P,
) where
P: Iterator<Item = u64>,
{
write_version: u64,
) {
if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
accounts_update_notifier.notify_account_update(
slot,
account,
txn,
pubkey,
write_version_producer.next().unwrap(),
write_version,
);
}
}
Expand Down
7 changes: 4 additions & 3 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3801,9 +3801,10 @@ impl Bank {
&durable_nonce,
lamports_per_signature,
);
self.rc
.accounts
.store_cached((self.slot(), accounts_to_store.as_slice()), &transactions);
self.rc.accounts.store_cached(
(self.slot(), accounts_to_store.as_slice()),
transactions.as_deref(),
);
});

self.collect_rent(&processing_results);
Expand Down
23 changes: 12 additions & 11 deletions svm/src/account_saver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn collect_accounts_to_store<'a, T: SVMMessage>(
processing_results: &'a mut [TransactionProcessingResult],
durable_nonce: &DurableNonce,
lamports_per_signature: u64,
) -> (Vec<(&'a Pubkey, &'a AccountSharedData)>, Vec<Option<&'a T>>) {
) -> (Vec<(&'a Pubkey, &'a AccountSharedData)>, Option<Vec<&'a T>>) {
let collect_capacity = max_number_of_accounts_to_collect(txs, processing_results);
let mut accounts = Vec::with_capacity(collect_capacity);
let mut transactions = Vec::with_capacity(collect_capacity);
Expand Down Expand Up @@ -87,12 +87,12 @@ pub fn collect_accounts_to_store<'a, T: SVMMessage>(
}
}
}
(accounts, transactions)
(accounts, Some(transactions))
}

fn collect_accounts_for_successful_tx<'a, T: SVMMessage>(
collected_accounts: &mut Vec<(&'a Pubkey, &'a AccountSharedData)>,
collected_account_transactions: &mut Vec<Option<&'a T>>,
collected_account_transactions: &mut Vec<&'a T>,
transaction: &'a T,
transaction_accounts: &'a [TransactionAccount],
) {
Expand All @@ -109,13 +109,13 @@ fn collect_accounts_for_successful_tx<'a, T: SVMMessage>(
})
{
collected_accounts.push((address, account));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);
}
}

fn collect_accounts_for_failed_tx<'a, T: SVMMessage>(
collected_accounts: &mut Vec<(&'a Pubkey, &'a AccountSharedData)>,
collected_account_transactions: &mut Vec<Option<&'a T>>,
collected_account_transactions: &mut Vec<&'a T>,
transaction: &'a T,
rollback_accounts: &'a mut RollbackAccounts,
durable_nonce: &DurableNonce,
Expand All @@ -125,7 +125,7 @@ fn collect_accounts_for_failed_tx<'a, T: SVMMessage>(
match rollback_accounts {
RollbackAccounts::FeePayerOnly { fee_payer_account } => {
collected_accounts.push((fee_payer_address, &*fee_payer_account));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);
}
RollbackAccounts::SameNonceAndFeePayer { nonce } => {
// Since we know we are dealing with a valid nonce account,
Expand All @@ -134,22 +134,22 @@ fn collect_accounts_for_failed_tx<'a, T: SVMMessage>(
.try_advance_nonce(*durable_nonce, lamports_per_signature)
.unwrap();
collected_accounts.push((nonce.address(), nonce.account()));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);
}
RollbackAccounts::SeparateNonceAndFeePayer {
nonce,
fee_payer_account,
} => {
collected_accounts.push((fee_payer_address, &*fee_payer_account));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);

// Since we know we are dealing with a valid nonce account,
// unwrap is safe here
nonce
.try_advance_nonce(*durable_nonce, lamports_per_signature)
.unwrap();
collected_accounts.push((nonce.address(), nonce.account()));
collected_account_transactions.push(Some(transaction));
collected_account_transactions.push(transaction);
}
}
}
Expand Down Expand Up @@ -294,9 +294,10 @@ mod tests {
.iter()
.any(|(pubkey, _account)| *pubkey == &keypair1.pubkey()));

let transactions = transactions.unwrap();
assert_eq!(transactions.len(), 2);
assert!(transactions.iter().any(|txn| txn.unwrap().eq(&tx0)));
assert!(transactions.iter().any(|txn| txn.unwrap().eq(&tx1)));
assert!(transactions.iter().any(|txn| (*txn).eq(&tx0)));
assert!(transactions.iter().any(|txn| (*txn).eq(&tx1)));
}

#[test]
Expand Down