diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index fca79383230cfe..d2800b2ef7e06a 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -2322,8 +2322,6 @@ impl<'a> ZeroLamport for StoredAccountMeta<'a> { } } -type GenerateIndexAccountsMap<'a> = HashMap>; - /// called on a struct while scanning append vecs trait AppendVecScan: Send + Sync + Clone { /// return true if this pubkey should be included @@ -8914,20 +8912,6 @@ impl AccountsDb { (result, slots) } - fn process_storage_slot<'a>( - &self, - storage: &'a Arc, - ) -> GenerateIndexAccountsMap<'a> { - let num_accounts = storage.approx_stored_count(); - let mut accounts_map = GenerateIndexAccountsMap::with_capacity(num_accounts); - storage.accounts.account_iter().for_each(|stored_account| { - let pubkey = stored_account.pubkey(); - assert!(!self.is_filler_account(pubkey)); - accounts_map.insert(*pubkey, stored_account); - }); - accounts_map - } - /// return Some(lamports_to_top_off) if 'account' would collect rent fn stats_for_rent_payers( pubkey: &Pubkey, @@ -8948,30 +8932,32 @@ impl AccountsDb { fn generate_index_for_slot( &self, - accounts_map: GenerateIndexAccountsMap<'_>, + storage: &Arc, slot: Slot, store_id: AppendVecId, rent_collector: &RentCollector, storage_info: &StorageSizeAndCountMap, ) -> SlotIndexGenerationInfo { - if accounts_map.is_empty() { + let mut accounts = storage.accounts.account_iter(); + if accounts.next().is_none() { return SlotIndexGenerationInfo::default(); } + let accounts = storage.accounts.account_iter(); let secondary = !self.account_indexes.is_empty(); let mut rent_paying_accounts_by_partition = Vec::default(); let mut accounts_data_len = 0; let mut num_accounts_rent_paying = 0; - let num_accounts = accounts_map.len(); let mut amount_to_top_off_rent = 0; let mut stored_size_alive = 0; - let items = accounts_map.into_iter().map(|(pubkey, stored_account)| { + let items = accounts.map(|stored_account| { stored_size_alive += stored_account.stored_size(); + let pubkey = stored_account.pubkey(); if secondary { self.accounts_index.update_secondary_indexes( - &pubkey, + pubkey, &stored_account, &self.account_indexes, ); @@ -8981,16 +8967,16 @@ impl AccountsDb { } if let Some(amount_to_top_off_rent_this_account) = - Self::stats_for_rent_payers(&pubkey, &stored_account, rent_collector) + Self::stats_for_rent_payers(pubkey, &stored_account, rent_collector) { amount_to_top_off_rent += amount_to_top_off_rent_this_account; num_accounts_rent_paying += 1; // remember this rent-paying account pubkey - rent_paying_accounts_by_partition.push(pubkey); + rent_paying_accounts_by_partition.push(*pubkey); } ( - pubkey, + *pubkey, AccountInfo::new( StorageLocation::AppendVec(store_id, stored_account.offset()), // will never be cached stored_account.lamports(), @@ -8998,15 +8984,31 @@ impl AccountsDb { ) }); - let (dirty_pubkeys, insert_time_us, generate_index_count) = self + let (dirty_pubkeys, insert_time_us, mut generate_index_results) = self .accounts_index - .insert_new_if_missing_into_primary_index(slot, num_accounts, items); + .insert_new_if_missing_into_primary_index(slot, storage.approx_stored_count(), items); + + if let Some(duplicates_this_slot) = std::mem::take(&mut generate_index_results.duplicates) { + // there were duplicate pubkeys in this same slot + // Some were not inserted. This means some info like stored data is off. + duplicates_this_slot + .into_iter() + .for_each(|(pubkey, (_slot, info))| { + let duplicate = storage.accounts.get_account(info.offset()).unwrap().0; + assert_eq!(&pubkey, duplicate.pubkey()); + stored_size_alive = stored_size_alive.saturating_sub(duplicate.stored_size()); + if !duplicate.is_zero_lamport() { + accounts_data_len = + accounts_data_len.saturating_sub(duplicate.data().len() as u64); + } + }); + } { // second, collect into the shared DashMap once we've figured out all the info per store_id let mut info = storage_info.entry(store_id).or_default(); info.stored_size += stored_size_alive; - info.count += generate_index_count.count; + info.count += generate_index_results.count; } // dirty_pubkeys will contain a pubkey if an item has multiple rooted entries for @@ -9017,7 +9019,7 @@ impl AccountsDb { } SlotIndexGenerationInfo { insert_time_us, - num_accounts: num_accounts as u64, + num_accounts: generate_index_results.count as u64, num_accounts_rent_paying, accounts_data_len, amount_to_top_off_rent, @@ -9176,7 +9178,6 @@ impl AccountsDb { // no storage at this slot, no information to pull out continue; }; - let accounts_map = self.process_storage_slot(&storage); let store_id = storage.append_vec_id(); scan_time.stop(); @@ -9194,12 +9195,13 @@ impl AccountsDb { rent_paying_accounts_by_partition: rent_paying_accounts_by_partition_this_slot, } = self.generate_index_for_slot( - accounts_map, + &storage, *slot, store_id, &rent_collector, &storage_info, ); + rent_paying.fetch_add(rent_paying_this_slot, Ordering::Relaxed); amount_to_top_off_rent .fetch_add(amount_to_top_off_rent_this_slot, Ordering::Relaxed); @@ -9220,10 +9222,10 @@ impl AccountsDb { // verify index matches expected and measure the time to get all items assert!(verify); let mut lookup_time = Measure::start("lookup_time"); - for account in accounts_map.into_iter() { - let (key, account_info) = account; - let lock = self.accounts_index.get_bin(&key); - let x = lock.get(&key).unwrap(); + for account_info in storage.accounts.account_iter() { + let key = account_info.pubkey(); + let lock = self.accounts_index.get_bin(key); + let x = lock.get(key).unwrap(); let sl = x.slot_list.read().unwrap(); let mut count = 0; for (slot2, account_info2) in sl.iter() { @@ -15818,9 +15820,8 @@ pub mod tests { let storage = accounts.storage.get_slot_storage_entry(slot0).unwrap(); let storage_info = StorageSizeAndCountMap::default(); - let accounts_map = accounts.process_storage_slot(&storage); accounts.generate_index_for_slot( - accounts_map, + &storage, slot0, 0, &RentCollector::default(), @@ -15842,14 +15843,7 @@ pub mod tests { // empty store let storage = accounts.create_and_insert_store(0, 1, "test"); let storage_info = StorageSizeAndCountMap::default(); - let accounts_map = accounts.process_storage_slot(&storage); - accounts.generate_index_for_slot( - accounts_map, - 0, - 0, - &RentCollector::default(), - &storage_info, - ); + accounts.generate_index_for_slot(&storage, 0, 0, &RentCollector::default(), &storage_info); assert!(storage_info.is_empty()); } @@ -15890,14 +15884,7 @@ pub mod tests { ); let storage_info = StorageSizeAndCountMap::default(); - let accounts_map = accounts.process_storage_slot(&storage); - accounts.generate_index_for_slot( - accounts_map, - 0, - 0, - &RentCollector::default(), - &storage_info, - ); + accounts.generate_index_for_slot(&storage, 0, 0, &RentCollector::default(), &storage_info); assert_eq!(storage_info.len(), 1); for entry in storage_info.iter() { assert_eq!( diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 959bb8319e5080..0b41948c79a7cf 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -72,9 +72,11 @@ pub type RefCount = u64; pub type AccountMap = Arc>; #[derive(Default, Debug, PartialEq, Eq)] -pub(crate) struct GenerateIndexCount { +pub(crate) struct GenerateIndexResult { /// number of accounts inserted in the index pub count: usize, + /// pubkeys which were present multiple times in the insertion request. + pub duplicates: Option>, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -1586,24 +1588,55 @@ impl + Into> AccountsIndex { self.account_maps.len() } + /// remove the earlier instances of each pubkey when the pubkey exists later in the `Vec`. + /// Could also be done with HashSet. + /// Returns `HashSet` of duplicate pubkeys. + fn remove_older_duplicate_pubkeys( + items: &mut Vec<(Pubkey, (Slot, T))>, + ) -> Option> { + if items.len() < 2 { + return None; + } + // stable sort by pubkey. + // Earlier entries are overwritten by later entries + items.sort_by(|a, b| a.0.cmp(&b.0)); + let mut duplicates = None::>; + let mut i = 0; + while i < items.len().saturating_sub(1) { + let this_key = &items[i].0; + // look at next entry. If it is same pubkey as this one, then remove this one. + if this_key == &items[i + 1].0 { + let mut duplicates_insert = duplicates.unwrap_or_default(); + // i+1 is same pubkey as i, so remove i + duplicates_insert.push(items.remove(i)); + duplicates = Some(duplicates_insert); + // `items` got smaller, so `i` remains the same. + // There could also be several duplicate pubkeys. + } else { + i += 1; + } + } + duplicates + } + // Same functionally to upsert, but: // 1. operates on a batch of items // 2. holds the write lock for the duration of adding the items // Can save time when inserting lots of new keys. // But, does NOT update secondary index // This is designed to be called at startup time. - // returns (dirty_pubkeys, insertion_time_us, GenerateIndexCount) + // returns (dirty_pubkeys, insertion_time_us, GenerateIndexResult) #[allow(clippy::needless_collect)] pub(crate) fn insert_new_if_missing_into_primary_index( &self, slot: Slot, - item_len: usize, + approx_items_len: usize, items: impl Iterator, - ) -> (Vec, u64, GenerateIndexCount) { + ) -> (Vec, u64, GenerateIndexResult) { // big enough so not likely to re-allocate, small enough to not over-allocate by too much // this assumes the largest bin contains twice the expected amount of the average size per bin let bins = self.bins(); - let expected_items_per_bin = item_len * 2 / bins; + let expected_items_per_bin = approx_items_len * 2 / bins; let use_disk = self.storage.storage.disk.is_some(); let mut binned = (0..bins) .map(|_| Vec::with_capacity(expected_items_per_bin)) @@ -1627,14 +1660,22 @@ impl + Into> AccountsIndex { // This results in calls to insert_new_entry_if_missing_with_lock from different threads starting at different bins to avoid // lock contention. let random_offset = thread_rng().gen_range(0..bins); + let mut duplicates = Vec::default(); (0..bins).for_each(|pubkey_bin| { let pubkey_bin = (pubkey_bin + random_offset) % bins; - let items = std::mem::take(&mut binned[pubkey_bin]); + let mut items = std::mem::take(&mut binned[pubkey_bin]); if items.is_empty() { return; } + + let these_duplicates = Self::remove_older_duplicate_pubkeys(&mut items); + if let Some(mut these_duplicates) = these_duplicates { + duplicates.append(&mut these_duplicates); + } + let r_account_maps = &self.account_maps[pubkey_bin]; let mut insert_time = Measure::start("insert_into_primary_index"); + // count only considers non-duplicate accounts count += items.len(); if use_disk { r_account_maps.startup_insert_only(items.into_iter()); @@ -1668,7 +1709,10 @@ impl + Into> AccountsIndex { ( dirty_pubkeys, insertion_time.load(Ordering::Relaxed), - GenerateIndexCount { count }, + GenerateIndexResult { + count, + duplicates: (!duplicates.is_empty()).then_some(duplicates), + }, ) } @@ -2101,6 +2145,56 @@ pub mod tests { assert_eq!(num, 0); } + #[test] + fn test_remove_older_duplicate_pubkeys() { + let pk1 = Pubkey::new_from_array([0; 32]); + let pk2 = Pubkey::new_from_array([1; 32]); + let slot0 = 0; + let info2 = 55; + let mut items = vec![]; + let removed = AccountsIndex::::remove_older_duplicate_pubkeys(&mut items); + assert!(items.is_empty()); + assert!(removed.is_none()); + let mut items = vec![(pk1, (slot0, 1u64)), (pk2, (slot0, 2))]; + let expected = items.clone(); + let removed = AccountsIndex::::remove_older_duplicate_pubkeys(&mut items); + assert_eq!(items, expected); + assert!(removed.is_none()); + + for dup in 0..3 { + for other in 0..dup + 2 { + let first_info = 10u64; + let mut items = vec![(pk1, (slot0, first_info))]; + let mut expected_dups = items.clone(); + for i in 0..dup { + let this_dup = (pk1, (slot0, i + 10u64 + 1)); + if i < dup.saturating_sub(1) { + expected_dups.push(this_dup); + } + items.push(this_dup); + } + let mut expected = vec![*items.last().unwrap()]; + let other_item = (pk2, (slot0, info2)); + if other == dup + 1 { + // don't insert + } else if other == dup { + expected.push(other_item); + items.push(other_item); + } else { + expected.push(other_item); + items.insert(other as usize, other_item); + } + let result = AccountsIndex::::remove_older_duplicate_pubkeys(&mut items); + assert_eq!(items, expected); + if dup != 0 { + assert_eq!(result.unwrap(), expected_dups); + } else { + assert!(result.is_none()); + } + } + } + } + #[test] fn test_secondary_index_include_exclude() { let pk1 = Pubkey::new_unique(); @@ -2194,6 +2288,44 @@ pub mod tests { true } } + + #[test] + fn test_insert_duplicates() { + let key = solana_sdk::pubkey::new_rand(); + let pubkey = &key; + let slot = 0; + let mut ancestors = Ancestors::default(); + ancestors.insert(slot, 0); + + let account_info = true; + let index = AccountsIndex::::default_for_tests(); + let account_info2: bool = !account_info; + let items = vec![(*pubkey, account_info), (*pubkey, account_info2)]; + index.set_startup(Startup::Startup); + let (_, _, result) = + index.insert_new_if_missing_into_primary_index(slot, items.len(), items.into_iter()); + assert_eq!(result.count, 1); + index.set_startup(Startup::Normal); + if let AccountIndexGetResult::Found(entry, index) = + // the entry for + index.get_for_tests(pubkey, Some(&ancestors), None) + { + // make sure the one with the correct info is added + assert_eq!(entry.slot_list()[index], (slot, account_info2)); + // make sure it wasn't inserted twice + assert_eq!( + entry + .slot_list() + .iter() + .filter_map(|(entry_slot, _)| (entry_slot == &slot).then_some(true)) + .count(), + 1 + ); + } else { + panic!("failed"); + } + } + #[test] fn test_insert_new_with_lock_no_ancestors() { let key = solana_sdk::pubkey::new_rand();