From 8b61a8eb6ab2f1e07b893f328d97b540cded9262 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 8 Dec 2024 17:11:35 +0000 Subject: [PATCH 01/11] fix in-memory startup indexgen --- accounts-db/src/accounts_db.rs | 3 +- accounts-db/src/accounts_index.rs | 42 ++++++++++++++----- .../accounts_index/in_mem_accounts_index.rs | 38 ++++++++++++++++- 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 3e246f08820054..7bd7d90fb1195a 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -8566,7 +8566,8 @@ impl AccountsDb { // a given pubkey. If there is just a single item, there is no cleaning to // be done on that pubkey. Use only those pubkeys with multiple updates. if !dirty_pubkeys.is_empty() { - self.uncleaned_pubkeys.insert(slot, dirty_pubkeys); + let old = self.uncleaned_pubkeys.insert(slot, dirty_pubkeys); + assert!(old.is_none()); } SlotIndexGenerationInfo { insert_time_us, diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 2aae2d80a21553..9dca9c4fda11c6 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -1713,6 +1713,10 @@ impl + Into> AccountsIndex { duplicates } + pub fn with_disk(&self) -> bool { + self.storage.storage.disk.is_some() + } + // Same functionally to upsert, but: // 1. operates on a batch of items // 2. holds the write lock for the duration of adding the items @@ -1730,12 +1734,12 @@ impl + Into> AccountsIndex { // this assumes the largest bin contains twice the expected amount of the average size per bin let bins = self.bins(); let expected_items_per_bin = approx_items_len * 2 / bins; - let use_disk = self.storage.storage.is_disk_index_enabled(); + let use_disk = self.with_disk(); let mut binned = (0..bins) .map(|_| Vec::with_capacity(expected_items_per_bin)) .collect::>(); let mut count = 0; - let mut dirty_pubkeys = items + let dirty_pubkeys = items .filter_map(|(pubkey, account_info)| { let pubkey_bin = self.bin_calculator.bin_from_pubkey(&pubkey); // this value is equivalent to what update() below would have created if we inserted a new item @@ -1775,6 +1779,7 @@ impl + Into> AccountsIndex { } else { // not using disk buckets, so just write to in-mem // this is no longer the default case + let mut dup_pubkeys = vec![]; items .into_iter() .for_each(|(pubkey, (slot, account_info))| { @@ -1789,11 +1794,16 @@ impl + Into> AccountsIndex { { InsertNewEntryResults::DidNotExist => {} InsertNewEntryResults::ExistedNewEntryZeroLamports => {} - InsertNewEntryResults::ExistedNewEntryNonZeroLamports => { - dirty_pubkeys.push(pubkey); + InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot) => { + if let Some(other_slot) = other_slot { + dup_pubkeys.push((other_slot, pubkey)); + } + dup_pubkeys.push((slot, pubkey)); } } }); + + r_account_maps.update_duplicates_from_in_memory_only_startup(dup_pubkeys); } insert_time.stop(); insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed); @@ -1814,13 +1824,23 @@ impl + Into> AccountsIndex { &self, f: impl Fn(Vec<(Slot, Pubkey)>) + Sync + Send, ) { - (0..self.bins()) - .into_par_iter() - .map(|pubkey_bin| { - let r_account_maps = &self.account_maps[pubkey_bin]; - r_account_maps.populate_and_retrieve_duplicate_keys_from_startup() - }) - .for_each(f); + if self.with_disk() { + (0..self.bins()) + .into_par_iter() + .map(|pubkey_bin| { + let r_account_maps = &self.account_maps[pubkey_bin]; + r_account_maps.populate_and_retrieve_duplicate_keys_from_startup() + }) + .for_each(f); + } else { + (0..self.bins()) + .into_par_iter() + .map(|pubkey_bin| { + let r_account_maps = &self.account_maps[pubkey_bin]; + r_account_maps.get_duplicates_from_in_memory_only_startup() + }) + .for_each(f); + } } /// Updates the given pubkey at the given slot with the new account information. diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index 330611e82f641b..504bb1083e70a6 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -135,7 +135,7 @@ impl + Into> Debug for InMemAccoun pub enum InsertNewEntryResults { DidNotExist, ExistedNewEntryZeroLamports, - ExistedNewEntryNonZeroLamports, + ExistedNewEntryNonZeroLamports(Option), } #[derive(Default, Debug)] @@ -153,6 +153,10 @@ struct StartupInfo + Into> { insert: Mutex>, /// pubkeys with more than 1 entry duplicates: Mutex>, + + /// (slot, pubkey) pairs that are duplicates when we are starting from in-memory only index. + /// And this field is only populated and used when we are building the in-memory only index. + duplicate_from_in_memory_only: Mutex>, } #[derive(Default, Debug)] @@ -727,6 +731,19 @@ impl + Into> InMemAccountsIndex) { + assert!(self.storage.get_startup()); + assert!(self.bucket.is_none()); + + let mut duplicates = self + .startup_info + .duplicate_from_in_memory_only + .lock() + .unwrap(); + + duplicates.extend(items); + } + pub fn insert_new_entry_if_missing_with_lock( &self, pubkey: Pubkey, @@ -737,10 +754,18 @@ impl + Into> InMemAccountsIndex { // in cache, so merge into cache let (slot, account_info) = new_entry.into(); + + let slot_list = occupied.get().slot_list.read().unwrap(); + if slot_list.len() == 1 { + other_slot = Some(slot_list[0].0); + } + drop(slot_list); + InMemAccountsIndex::::lock_and_update_slot_list( occupied.get(), (slot, account_info), @@ -796,7 +821,7 @@ impl + Into> InMemAccountsIndex + Into> InMemAccountsIndex Vec<(Slot, Pubkey)> { + let mut duplicates = self + .startup_info + .duplicate_from_in_memory_only + .lock() + .unwrap(); + std::mem::take(&mut *duplicates) + } + /// synchronize the in-mem index with the disk index fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) { let current_age = self.storage.current_age(); From 95dceea466bbf8fa0dc2cf3e90aed2b8de18c0cf Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Tue, 10 Dec 2024 19:03:04 +0000 Subject: [PATCH 02/11] merge fix --- accounts-db/src/accounts_index.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 9dca9c4fda11c6..a33b2afe770dca 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -1713,10 +1713,6 @@ impl + Into> AccountsIndex { duplicates } - pub fn with_disk(&self) -> bool { - self.storage.storage.disk.is_some() - } - // Same functionally to upsert, but: // 1. operates on a batch of items // 2. holds the write lock for the duration of adding the items @@ -1734,7 +1730,7 @@ impl + Into> AccountsIndex { // this assumes the largest bin contains twice the expected amount of the average size per bin let bins = self.bins(); let expected_items_per_bin = approx_items_len * 2 / bins; - let use_disk = self.with_disk(); + let use_disk = self.storage.storage.is_disk_index_enabled(); let mut binned = (0..bins) .map(|_| Vec::with_capacity(expected_items_per_bin)) .collect::>(); @@ -1824,7 +1820,7 @@ impl + Into> AccountsIndex { &self, f: impl Fn(Vec<(Slot, Pubkey)>) + Sync + Send, ) { - if self.with_disk() { + if self.storage.storage.is_disk_index_enabled() { (0..self.bins()) .into_par_iter() .map(|pubkey_bin| { From 88d0ba334cdabacb65d2429925cf2596827d9894 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Wed, 11 Dec 2024 15:14:25 +0000 Subject: [PATCH 03/11] pr: rename --- accounts-db/src/accounts_index.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index a33b2afe770dca..b7126bfe293c4b 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -1775,7 +1775,7 @@ impl + Into> AccountsIndex { } else { // not using disk buckets, so just write to in-mem // this is no longer the default case - let mut dup_pubkeys = vec![]; + let mut duplicates_from_in_memory = vec![]; items .into_iter() .for_each(|(pubkey, (slot, account_info))| { @@ -1792,14 +1792,15 @@ impl + Into> AccountsIndex { InsertNewEntryResults::ExistedNewEntryZeroLamports => {} InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot) => { if let Some(other_slot) = other_slot { - dup_pubkeys.push((other_slot, pubkey)); + duplicates_from_in_memory.push((other_slot, pubkey)); } - dup_pubkeys.push((slot, pubkey)); + duplicates_from_in_memory.push((slot, pubkey)); } } }); - r_account_maps.update_duplicates_from_in_memory_only_startup(dup_pubkeys); + r_account_maps + .update_duplicates_from_in_memory_only_startup(duplicates_from_in_memory); } insert_time.stop(); insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed); From b70f6ab676ba2de67ee3750b16004e9288e5916b Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Wed, 11 Dec 2024 15:16:08 +0000 Subject: [PATCH 04/11] pr: reanme --- accounts-db/src/accounts_index/in_mem_accounts_index.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index 504bb1083e70a6..530f3806afbc29 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -156,7 +156,7 @@ struct StartupInfo + Into> { /// (slot, pubkey) pairs that are duplicates when we are starting from in-memory only index. /// And this field is only populated and used when we are building the in-memory only index. - duplicate_from_in_memory_only: Mutex>, + duplicates_from_in_memory_only: Mutex>, } #[derive(Default, Debug)] @@ -737,7 +737,7 @@ impl + Into> InMemAccountsIndex + Into> InMemAccountsIndex Vec<(Slot, Pubkey)> { let mut duplicates = self .startup_info - .duplicate_from_in_memory_only + .duplicates_from_in_memory_only .lock() .unwrap(); std::mem::take(&mut *duplicates) From f15e6c24d03fe177cbdcaba8ecbe216a5f3d8d4d Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Wed, 11 Dec 2024 15:17:00 +0000 Subject: [PATCH 05/11] pr: rename --- accounts-db/src/accounts_index.rs | 2 +- accounts-db/src/accounts_index/in_mem_accounts_index.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index b7126bfe293c4b..8afa8525e41016 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -1800,7 +1800,7 @@ impl + Into> AccountsIndex { }); r_account_maps - .update_duplicates_from_in_memory_only_startup(duplicates_from_in_memory); + .startup_update_duplicates_from_in_memory_only(duplicates_from_in_memory); } insert_time.stop(); insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed); diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index 530f3806afbc29..849b3d51155687 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -731,7 +731,7 @@ impl + Into> InMemAccountsIndex) { + pub fn startup_update_duplicates_from_in_memory_only(&self, items: Vec<(Slot, Pubkey)>) { assert!(self.storage.get_startup()); assert!(self.bucket.is_none()); From 237bca48e9992abc62a8d80ecc8a21fffe1d7450 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Wed, 11 Dec 2024 15:26:10 +0000 Subject: [PATCH 06/11] pr: rename --- accounts-db/src/accounts_index.rs | 2 +- accounts-db/src/accounts_index/in_mem_accounts_index.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 8afa8525e41016..3b0bc4d089daed 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -1834,7 +1834,7 @@ impl + Into> AccountsIndex { .into_par_iter() .map(|pubkey_bin| { let r_account_maps = &self.account_maps[pubkey_bin]; - r_account_maps.get_duplicates_from_in_memory_only_startup() + r_account_maps.startup_take_duplicates_from_in_memory_only() }) .for_each(f); } diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index 849b3d51155687..50d49713d252ba 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -1172,7 +1172,7 @@ impl + Into> InMemAccountsIndex Vec<(Slot, Pubkey)> { + pub fn startup_take_duplicates_from_in_memory_only(&self) -> Vec<(Slot, Pubkey)> { let mut duplicates = self .startup_info .duplicates_from_in_memory_only From 00163ed5b9ed5deeeace525f759b5dea9273f4b1 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Wed, 11 Dec 2024 17:37:03 +0000 Subject: [PATCH 07/11] pr --- accounts-db/src/accounts_index.rs | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 3b0bc4d089daed..1fe7470bb59d66 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -1821,23 +1821,17 @@ impl + Into> AccountsIndex { &self, f: impl Fn(Vec<(Slot, Pubkey)>) + Sync + Send, ) { - if self.storage.storage.is_disk_index_enabled() { - (0..self.bins()) - .into_par_iter() - .map(|pubkey_bin| { - let r_account_maps = &self.account_maps[pubkey_bin]; + (0..self.bins()) + .into_par_iter() + .map(|pubkey_bin| { + let r_account_maps = &self.account_maps[pubkey_bin]; + if self.storage.storage.is_disk_index_enabled() { r_account_maps.populate_and_retrieve_duplicate_keys_from_startup() - }) - .for_each(f); - } else { - (0..self.bins()) - .into_par_iter() - .map(|pubkey_bin| { - let r_account_maps = &self.account_maps[pubkey_bin]; + } else { r_account_maps.startup_take_duplicates_from_in_memory_only() - }) - .for_each(f); - } + } + }) + .for_each(f); } /// Updates the given pubkey at the given slot with the new account information. From 7934ef24ff5c663ccecf345969125e13540bf5b8 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Wed, 11 Dec 2024 17:47:06 +0000 Subject: [PATCH 08/11] pr --- .../accounts_index/in_mem_accounts_index.rs | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index 50d49713d252ba..511a51c857936b 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -145,6 +145,11 @@ struct StartupInfoDuplicates { duplicates: Vec<(Slot, Pubkey, T)>, /// pubkeys that were already added to disk and later found to be duplicates, duplicates_put_on_disk: HashSet<(Slot, Pubkey)>, + + /// (slot, pubkey) pairs that are found to be duplicates when we are + /// starting from in-memory only index. This filed is used only when disk + /// index is disabled. + duplicates_from_in_memory_only: Vec<(Slot, Pubkey)>, } #[derive(Default, Debug)] @@ -153,10 +158,6 @@ struct StartupInfo + Into> { insert: Mutex>, /// pubkeys with more than 1 entry duplicates: Mutex>, - - /// (slot, pubkey) pairs that are duplicates when we are starting from in-memory only index. - /// And this field is only populated and used when we are building the in-memory only index. - duplicates_from_in_memory_only: Mutex>, } #[derive(Default, Debug)] @@ -735,13 +736,8 @@ impl + Into> InMemAccountsIndex + Into> InMemAccountsIndex Vec<(Slot, Pubkey)> { - let mut duplicates = self - .startup_info - .duplicates_from_in_memory_only - .lock() - .unwrap(); - std::mem::take(&mut *duplicates) + let mut duplicates = self.startup_info.duplicates.lock().unwrap(); + std::mem::take(&mut duplicates.duplicates_from_in_memory_only) } /// synchronize the in-mem index with the disk index From 15d6e13a9899394f3ab1439a37ece7fca558086a Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Wed, 11 Dec 2024 18:38:15 +0000 Subject: [PATCH 09/11] typo --- accounts-db/src/accounts_index/in_mem_accounts_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index 511a51c857936b..f0f1de9469838e 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -147,7 +147,7 @@ struct StartupInfoDuplicates { duplicates_put_on_disk: HashSet<(Slot, Pubkey)>, /// (slot, pubkey) pairs that are found to be duplicates when we are - /// starting from in-memory only index. This filed is used only when disk + /// starting from in-memory only index. This field is used only when disk /// index is disabled. duplicates_from_in_memory_only: Vec<(Slot, Pubkey)>, } From ce255436600fd7bf88a5fd91b593c697d81c5720 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Thu, 12 Dec 2024 15:52:54 +0000 Subject: [PATCH 10/11] pr: add comments --- accounts-db/src/accounts_index/in_mem_accounts_index.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index f0f1de9469838e..7d7a810afb06d8 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -757,6 +757,13 @@ impl + Into> InMemAccountsIndex 1, the + // items, previously inserted into the slot_list, have already + // been added. We don't need to add them again. if slot_list.len() == 1 { other_slot = Some(slot_list[0].0); } From 482486f8db4727c032ebaf7e6818d503057e6410 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Thu, 12 Dec 2024 21:37:01 +0000 Subject: [PATCH 11/11] pr: fix a race --- .../src/accounts_index/in_mem_accounts_index.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index 7d7a810afb06d8..5bd40a695ff327 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -769,13 +769,25 @@ impl + Into> InMemAccountsIndex::lock_and_update_slot_list( + let updated_slot_list_len = InMemAccountsIndex::::lock_and_update_slot_list( occupied.get(), (slot, account_info), None, // should be None because we don't expect a different slot # during index generation &mut Vec::default(), UpsertReclaim::IgnoreReclaims, ); + + // In case of a race condition, multiple threads try to insert + // to the same pubkey with different slots. We only need to + // record `other_slot` once. If the slot list length after + // update is not 2, it means that someone else has already + // recorded `other_slot` before us. Therefore, We don't need to + // record it again. + if updated_slot_list_len != 2 { + // clear `other_slot` if we don't win the race. + other_slot = None; + } + ( true, /* found in mem */ true, /* already existed */