diff --git a/accounts-db/src/accounts_db/geyser_plugin_utils.rs b/accounts-db/src/accounts_db/geyser_plugin_utils.rs index 34bd3d7b52a02f..f5bb3a9ffb7b27 100644 --- a/accounts-db/src/accounts_db/geyser_plugin_utils.rs +++ b/accounts-db/src/accounts_db/geyser_plugin_utils.rs @@ -89,40 +89,65 @@ impl AccountsDb { ) { let storage_entry = self.storage.get_slot_storage_entry(slot).unwrap(); - let mut accounts_to_stream: HashMap = HashMap::default(); + let mut accounts_duplicate: HashMap = HashMap::default(); let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts"); - let accounts = storage_entry.accounts.account_iter(); let mut account_len = 0; - accounts.for_each(|account| { + let mut pubkeys = HashSet::new(); + + // populate `accounts_duplicate` for any pubkeys that are in this storage twice. + // Storages cannot return `StoredAccountMeta<'_>` for more than 1 account at a time, so we have to do 2 passes to make sure + // we don't have duplicate pubkeys. + let mut i = 0; + storage_entry.accounts.scan_pubkeys(|pubkey| { + i += 1; // pre-increment to most easily match early returns in next loop + if !pubkeys.insert(*pubkey) { + accounts_duplicate.insert(*pubkey, i); // remember the highest index entry in this slot + } + }); + + // now, actually notify geyser + let mut i = 0; + storage_entry.accounts.scan_accounts(|account| { + i += 1; account_len += 1; if notified_accounts.contains(account.pubkey()) { notify_stats.skipped_accounts += 1; return; } + if let Some(highest_i) = accounts_duplicate.get(account.pubkey()) { + if highest_i != &i { + // this pubkey is in this storage twice and the current instance is not the last one, so we skip it. + // We only send unique accounts in this slot to `notify_filtered_accounts` + return; + } + } // later entries in the same slot are more recent and override earlier accounts for the same pubkey // We can pass an incrementing number here for write_version in the future, if the storage does not have a write_version. // As long as all accounts for this slot are in 1 append vec that can be itereated olest to newest. - accounts_to_stream.insert(*account.pubkey(), account); + self.notify_filtered_accounts( + slot, + notified_accounts, + std::iter::once(account), + notify_stats, + ); }); notify_stats.total_accounts += account_len; measure_filter.stop(); notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize; - - self.notify_filtered_accounts(slot, notified_accounts, accounts_to_stream, notify_stats); } - fn notify_filtered_accounts( + fn notify_filtered_accounts<'a>( &self, slot: Slot, notified_accounts: &mut HashSet, - mut accounts_to_stream: HashMap, + accounts_to_stream: impl Iterator>, notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats, ) { let notifier = self.accounts_update_notifier.as_ref().unwrap(); let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts"); let local_write_version = 0; - for (_, mut account) in accounts_to_stream.drain() { + for mut account in accounts_to_stream { // We do not need to rely on the specific write_version read from the append vec. // So, overwrite the write_version with something that works. // 'accounts_to_stream' is already a hashmap, so there is already only entry per pubkey. @@ -143,8 +168,9 @@ impl AccountsDb { notified_accounts.insert(*account.pubkey()); measure_bookkeep.stop(); notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize; + + notify_stats.notified_accounts += 1; } - notify_stats.notified_accounts += accounts_to_stream.len(); measure_notify.stop(); notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize; }