Skip to content

Commit

Permalink
modify geyser account iter at snapshot load (anza-xyz#960)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington authored Apr 22, 2024
1 parent 7389371 commit 862c79e
Showing 1 changed file with 36 additions and 10 deletions.
46 changes: 36 additions & 10 deletions accounts-db/src/accounts_db/geyser_plugin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,40 +89,65 @@ impl AccountsDb {
) {
let storage_entry = self.storage.get_slot_storage_entry(slot).unwrap();

let mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta> = HashMap::default();
let mut accounts_duplicate: HashMap<Pubkey, usize> = HashMap::default();
let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
let accounts = storage_entry.accounts.account_iter();
let mut account_len = 0;
accounts.for_each(|account| {
let mut pubkeys = HashSet::new();

// populate `accounts_duplicate` for any pubkeys that are in this storage twice.
// Storages cannot return `StoredAccountMeta<'_>` for more than 1 account at a time, so we have to do 2 passes to make sure
// we don't have duplicate pubkeys.
let mut i = 0;
storage_entry.accounts.scan_pubkeys(|pubkey| {
i += 1; // pre-increment to most easily match early returns in next loop
if !pubkeys.insert(*pubkey) {
accounts_duplicate.insert(*pubkey, i); // remember the highest index entry in this slot
}
});

// now, actually notify geyser
let mut i = 0;
storage_entry.accounts.scan_accounts(|account| {
i += 1;
account_len += 1;
if notified_accounts.contains(account.pubkey()) {
notify_stats.skipped_accounts += 1;
return;
}
if let Some(highest_i) = accounts_duplicate.get(account.pubkey()) {
if highest_i != &i {
// this pubkey is in this storage twice and the current instance is not the last one, so we skip it.
// We only send unique accounts in this slot to `notify_filtered_accounts`
return;
}
}

// later entries in the same slot are more recent and override earlier accounts for the same pubkey
// We can pass an incrementing number here for write_version in the future, if the storage does not have a write_version.
// As long as all accounts for this slot are in 1 append vec that can be itereated olest to newest.
accounts_to_stream.insert(*account.pubkey(), account);
self.notify_filtered_accounts(
slot,
notified_accounts,
std::iter::once(account),
notify_stats,
);
});
notify_stats.total_accounts += account_len;
measure_filter.stop();
notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;

self.notify_filtered_accounts(slot, notified_accounts, accounts_to_stream, notify_stats);
}

fn notify_filtered_accounts(
fn notify_filtered_accounts<'a>(
&self,
slot: Slot,
notified_accounts: &mut HashSet<Pubkey>,
mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta>,
accounts_to_stream: impl Iterator<Item = StoredAccountMeta<'a>>,
notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
) {
let notifier = self.accounts_update_notifier.as_ref().unwrap();
let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
let local_write_version = 0;
for (_, mut account) in accounts_to_stream.drain() {
for mut account in accounts_to_stream {
// We do not need to rely on the specific write_version read from the append vec.
// So, overwrite the write_version with something that works.
// 'accounts_to_stream' is already a hashmap, so there is already only entry per pubkey.
Expand All @@ -143,8 +168,9 @@ impl AccountsDb {
notified_accounts.insert(*account.pubkey());
measure_bookkeep.stop();
notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;

notify_stats.notified_accounts += 1;
}
notify_stats.notified_accounts += accounts_to_stream.len();
measure_notify.stop();
notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
}
Expand Down

0 comments on commit 862c79e

Please sign in to comment.