From bc933c63cec078015d180f10f721025b425a05f9 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 21 Mar 2023 20:17:58 +0800 Subject: [PATCH] Fix SlotMeta connected tracking (#28069) Fix SlotMeta is_connected tracking The tracking of connected status was previously based upon an assumption that would be practically false for all validators. The connected status of slots played into whether Blockstore would signal ReplayStage that it had new shreds ready to be replayed. Prior to the change, we would never signal and ReplayStage would always wait the entire duration of a 100ms timeout before restarting its' main processing loop. This commit introduces a change where we mark snapshot slots as connected. A validator may not have a path all the way back to genesis itself; however, snapshots are taken at known roots so we extend the connected status to these slots. Once a node has been bootstrapped once to have is connected, the logic persists in Blockstore such that all children on the main fork also get their connected status updated properly. --- ledger/src/blockstore.rs | 260 +++++++++++++++++++++-------- ledger/src/blockstore_meta.rs | 33 +++- ledger/src/blockstore_processor.rs | 6 +- 3 files changed, 228 insertions(+), 71 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index dada1dd187b3ba..44397381b4de59 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3364,6 +3364,53 @@ impl Blockstore { ); Ok(()) } + + /// Mark a root `slot` as connected, traverse `slot`'s children and update + /// the children's connected status if appropriate. + /// + /// A ledger with a full path of blocks from genesis to the latest root will + /// have all of the rooted blocks marked as connected such that new blocks + /// could also be connected. However, starting from some root (such as from + /// a snapshot) is a valid way to join a cluster. For this case, mark this + /// root as connected such that the node that joined midway through can + /// have their slots considered connected. + pub fn set_and_chain_connected_on_root_and_next_slots(&self, root: Slot) -> Result<()> { + let mut root_meta = self + .meta(root)? + .unwrap_or_else(|| SlotMeta::new(root, None)); + // If the slot was already connected, there is nothing to do as this slot's + // children are also assumed to be appropriately connected + if root_meta.is_connected() { + return Ok(()); + } + info!( + "Marking slot {} and any full children slots as connected", + root + ); + let mut write_batch = self.db.batch()?; + + // Mark both connected bits on the root slot so that the flags for this + // slot match the flags of slots that become connected the typical way. + root_meta.set_parent_connected(); + root_meta.set_connected(); + write_batch.put::(root_meta.slot, &root_meta)?; + + let mut next_slots = VecDeque::from(root_meta.next_slots); + while !next_slots.is_empty() { + let slot = next_slots.pop_front().unwrap(); + let mut meta = self.meta(slot)?.unwrap_or_else(|| { + panic!("Slot {slot} is a child but has no SlotMeta in blockstore") + }); + + if meta.set_parent_connected() { + next_slots.extend(meta.next_slots.iter()); + } + write_batch.put::(meta.slot, &meta)?; + } + + self.db.write(write_batch)?; + Ok(()) + } } // Update the `completed_data_indexes` with a new shred `new_shred_index`. If a @@ -3741,7 +3788,6 @@ fn handle_chaining_for_slot( let meta = &slot_meta_entry.new_slot_meta; let meta_backup = &slot_meta_entry.old_slot_meta; - { let mut meta_mut = meta.borrow_mut(); let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap()); @@ -3778,44 +3824,32 @@ fn handle_chaining_for_slot( } } - // If this is a newly inserted slot, then we know the children of this slot were not previously - // connected to the trunk of the ledger. Thus if slot.is_connected is now true, we need to - // update all child slots with `is_connected` = true because these children are also now newly - // connected to trunk of the ledger + // If this is a newly completed slot and the parent is connected, then the + // slot is now connected. Mark the slot as connected, and then traverse the + // children to update their parent_connected and connected status. let should_propagate_is_connected = is_newly_completed_slot(&RefCell::borrow(meta), meta_backup) - && RefCell::borrow(meta).is_connected(); + && RefCell::borrow(meta).is_parent_connected(); if should_propagate_is_connected { - // slot_function returns a boolean indicating whether to explore the children - // of the input slot - let slot_function = |slot: &mut SlotMeta| { - slot.set_connected(); - - // We don't want to set the is_connected flag on the children of non-full - // slots - slot.is_full() - }; - + meta.borrow_mut().set_connected(); traverse_children_mut( db, - slot, meta, working_set, new_chained_slots, - slot_function, + SlotMeta::set_parent_connected, )?; } Ok(()) } -/// Traverse all the direct and indirect children slots and apply the specified -/// `slot_function`. +/// Traverse all the children (direct and indirect) of `slot_meta`, and apply +/// `slot_function` to each of the children (but not `slot_meta`). /// /// Arguments: /// `db`: the blockstore db that stores shreds and their metadata. -/// `slot`: starting slot to traverse. /// `slot_meta`: the SlotMeta of the above `slot`. /// `working_set`: a slot-id to SlotMetaWorkingSetEntry map which is used /// to traverse the graph. @@ -3826,7 +3860,6 @@ fn handle_chaining_for_slot( /// a given slot. fn traverse_children_mut( db: &Database, - slot: Slot, slot_meta: &Rc>, working_set: &HashMap, passed_visisted_slots: &mut HashMap>>, @@ -3835,25 +3868,18 @@ fn traverse_children_mut( where F: Fn(&mut SlotMeta) -> bool, { - let mut next_slots: VecDeque<(u64, Rc>)> = - vec![(slot, slot_meta.clone())].into(); + let slot_meta = slot_meta.borrow(); + let mut next_slots: VecDeque = slot_meta.next_slots.to_vec().into(); while !next_slots.is_empty() { - let (_, current_slot) = next_slots.pop_front().unwrap(); - // Check whether we should explore the children of this slot - if slot_function(&mut current_slot.borrow_mut()) { - let current_slot = &RefCell::borrow(&*current_slot); - for next_slot_index in current_slot.next_slots.iter() { - let next_slot = find_slot_meta_else_create( - db, - working_set, - passed_visisted_slots, - *next_slot_index, - )?; - next_slots.push_back((*next_slot_index, next_slot)); - } + let slot = next_slots.pop_front().unwrap(); + let meta_ref = find_slot_meta_else_create(db, working_set, passed_visisted_slots, slot)?; + let mut meta = meta_ref.borrow_mut(); + if slot_function(&mut meta) { + meta.next_slots + .iter() + .for_each(|slot| next_slots.push_back(*slot)); } } - Ok(()) } @@ -3864,15 +3890,14 @@ fn is_orphan(meta: &SlotMeta) -> bool { } // 1) Chain current_slot to the previous slot defined by prev_slot_meta -// 2) Determine whether to set the is_connected flag fn chain_new_slot_to_prev_slot( prev_slot_meta: &mut SlotMeta, current_slot: Slot, current_slot_meta: &mut SlotMeta, ) { prev_slot_meta.next_slots.push(current_slot); - if prev_slot_meta.is_connected() && prev_slot_meta.is_full() { - current_slot_meta.set_connected(); + if prev_slot_meta.is_connected() { + current_slot_meta.set_parent_connected(); } } @@ -3882,18 +3907,19 @@ fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option) -> bool { - // We should signal that there are updates if we extended the chain of consecutive blocks starting - // from block 0, which is true iff: - // 1) The block with index prev_block_index is itself part of the trunk of consecutive blocks - // starting from block 0, - slot_meta.is_connected() && - // AND either: - // 1) The slot didn't exist in the database before, and now we have a consecutive - // block for that slot + // First, this slot's parent must be connected in order to even consider + // starting replay; otherwise, the replayed results may not be valid. + slot_meta.is_parent_connected() && + // Then, + // If the slot didn't exist in the db before, any consecutive shreds + // at the start of the slot are ready to be replayed. ((slot_meta_backup.is_none() && slot_meta.consumed != 0) || - // OR - // 2) The slot did exist, but now we have a new consecutive block for that slot + // Or, + // If the slot has more consecutive shreds than it last did from the + // last update, those shreds are new and also ready to be replayed. (slot_meta_backup.is_some() && slot_meta_backup.as_ref().unwrap().consumed != slot_meta.consumed)) } @@ -4454,7 +4480,7 @@ pub mod tests { solana_transaction_status::{ InnerInstruction, InnerInstructions, Reward, Rewards, TransactionTokenBalance, }, - std::{thread::Builder, time::Duration}, + std::{cmp::Ordering, thread::Builder, time::Duration}, }; // used for tests only @@ -5388,7 +5414,7 @@ pub mod tests { blockstore.insert_shreds(shreds1, None, false).unwrap(); let meta1 = blockstore.meta(1).unwrap().unwrap(); assert!(meta1.next_slots.is_empty()); - // Slot 1 is not trunk because slot 0 hasn't been inserted yet + // Slot 1 is not connected because slot 0 hasn't been inserted yet assert!(!meta1.is_connected()); assert_eq!(meta1.parent_slot, Some(0)); assert_eq!(meta1.last_index, Some(shreds_per_slot as u64 - 1)); @@ -5400,13 +5426,13 @@ pub mod tests { blockstore.insert_shreds(shreds2, None, false).unwrap(); let meta2 = blockstore.meta(2).unwrap().unwrap(); assert!(meta2.next_slots.is_empty()); - // Slot 2 is not trunk because slot 0 hasn't been inserted yet + // Slot 2 is not connected because slot 0 hasn't been inserted yet assert!(!meta2.is_connected()); assert_eq!(meta2.parent_slot, Some(1)); assert_eq!(meta2.last_index, Some(shreds_per_slot as u64 - 1)); // Check the first slot again, it should chain to the second slot, - // but still isn't part of the trunk + // but still isn't connected. let meta1 = blockstore.meta(1).unwrap().unwrap(); assert_eq!(meta1.next_slots, vec![2]); assert!(!meta1.is_connected()); @@ -5440,7 +5466,7 @@ pub mod tests { let num_slots = 30; let entries_per_slot = 5; - // Make a bunch of shreds and split by whether slot is even or odd + // Make some shreds and split based on whether the slot is odd or even. let (shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot); let shreds_per_slot = shreds.len() as u64 / num_slots; let (even_slots, odd_slots): (Vec<_>, Vec<_>) = @@ -5466,8 +5492,10 @@ pub mod tests { assert_eq!(meta.parent_slot, Some(slot - 1)); } - // Slot 0 is the only connected slot - assert!(!meta.is_connected() || meta.slot == 0); + // None of the slot should be connected, but since slot 0 is + // the special case, it will have parent_connected as true. + assert!(!meta.is_connected()); + assert!(!meta.is_parent_connected() || slot == 0); } // Write the even slot shreds that we did not earlier @@ -5497,6 +5525,7 @@ pub mod tests { #[test] #[allow(clippy::cognitive_complexity)] pub fn test_forward_chaining_is_connected() { + solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -5533,14 +5562,14 @@ pub mod tests { } // Ensure that each slot has their parent correct - // Additionally, slot 0 should be the only connected slot if slot == 0 { assert_eq!(meta.parent_slot, Some(0)); - assert!(meta.is_connected()); } else { assert_eq!(meta.parent_slot, Some(slot - 1)); - assert!(!meta.is_connected()); } + // No slots should be connected yet, not even slot 0 + // as slot 0 is still not full yet + assert!(!meta.is_connected()); assert_eq!(meta.last_index, Some(shreds_per_slot as u64 - 1)); } @@ -5554,28 +5583,123 @@ pub mod tests { for slot in 0..num_slots { let meta = blockstore.meta(slot).unwrap().unwrap(); + if slot != num_slots - 1 { assert_eq!(meta.next_slots, vec![slot + 1]); } else { assert!(meta.next_slots.is_empty()); } - if slot <= slot_index + 3 { + + if slot < slot_index + 3 { + assert!(meta.is_full()); assert!(meta.is_connected()); } else { assert!(!meta.is_connected()); } - if slot == 0 { - assert_eq!(meta.parent_slot, Some(0)); - } else { - assert_eq!(meta.parent_slot, Some(slot - 1)); - } - assert_eq!(meta.last_index, Some(shreds_per_slot as u64 - 1)); } } } } + + #[test] + fn test_set_and_chain_connected_on_root_and_next_slots() { + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + // Create enough entries to ensure 5 shreds result + let entries_per_slot = max_ticks_per_n_shreds(5, None); + + let mut start_slot = 5; + // Start a chain from a slot not in blockstore, this is the case when + // node starts with no blockstore and downloads a snapshot. In this + // scenario, the slot will be marked connected despite its' parent not + // being connected (or existing) and not being full. + blockstore + .set_and_chain_connected_on_root_and_next_slots(start_slot) + .unwrap(); + let slot_meta5 = blockstore.meta(start_slot).unwrap().unwrap(); + assert!(!slot_meta5.is_full()); + assert!(slot_meta5.is_parent_connected()); + assert!(slot_meta5.is_connected()); + + let num_slots = 5; + // Insert some new slots and ensure they connect to the root correctly + start_slot += 1; + let (shreds, _) = make_many_slot_entries(start_slot, num_slots, entries_per_slot); + blockstore.insert_shreds(shreds, None, false).unwrap(); + for slot in start_slot..start_slot + num_slots { + info!("Evaluating slot {}", slot); + let meta = blockstore.meta(slot).unwrap().unwrap(); + assert!(meta.is_parent_connected()); + assert!(meta.is_connected()); + } + + // Chain connected on slots that are already connected, should just noop + blockstore + .set_and_chain_connected_on_root_and_next_slots(start_slot) + .unwrap(); + for slot in start_slot..start_slot + num_slots { + let meta = blockstore.meta(slot).unwrap().unwrap(); + assert!(meta.is_parent_connected()); + assert!(meta.is_connected()); + } + + // Start another chain that is disconnected from previous chain. But, insert + // a non-full slot and ensure this slot (and its' children) are not marked + // as connected. + start_slot += 2 * num_slots; + let (shreds, _) = make_many_slot_entries(start_slot, num_slots, entries_per_slot); + // Insert all shreds except for the shreds with index > 0 from non_full_slot + let non_full_slot = start_slot + num_slots / 2; + let (shreds, missing_shreds) = shreds + .into_iter() + .partition(|shred| shred.slot() != non_full_slot || shred.index() == 0); + blockstore.insert_shreds(shreds, None, false).unwrap(); + // Chain method hasn't been called yet, so none of these connected yet + for slot in start_slot..start_slot + num_slots { + let meta = blockstore.meta(slot).unwrap().unwrap(); + assert!(!meta.is_parent_connected()); + assert!(!meta.is_connected()); + } + // Now chain from the new starting point + blockstore + .set_and_chain_connected_on_root_and_next_slots(start_slot) + .unwrap(); + for slot in start_slot..start_slot + num_slots { + let meta = blockstore.meta(slot).unwrap().unwrap(); + match slot.cmp(&non_full_slot) { + Ordering::Less => { + // These are fully connected as expected + assert!(meta.is_parent_connected()); + assert!(meta.is_connected()); + } + Ordering::Equal => { + // Parent will be connected, but this slot not connected itself + assert!(meta.is_parent_connected()); + assert!(!meta.is_connected()); + } + Ordering::Greater => { + // All children are not connected either + assert!(!meta.is_parent_connected()); + assert!(!meta.is_connected()); + } + } + } + + // Insert the missing shreds and ensure all slots connected now + blockstore + .insert_shreds(missing_shreds, None, false) + .unwrap(); + for slot in start_slot..start_slot + num_slots { + let meta = blockstore.meta(slot).unwrap().unwrap(); + assert!(meta.is_parent_connected()); + assert!(meta.is_connected()); + } + } + /* #[test] pub fn test_chaining_tree() { diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 531b4ff091776d..65a5c2c2ed0757 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -253,14 +253,42 @@ impl SlotMeta { Some(self.consumed) == self.last_index.map(|ix| ix + 1) } + /// Returns a boolean indicating whether the meta is connected. pub fn is_connected(&self) -> bool { self.connected_flags.contains(ConnectedFlags::CONNECTED) } + /// Mark the meta as connected. pub fn set_connected(&mut self) { + assert!(self.is_parent_connected()); self.connected_flags.set(ConnectedFlags::CONNECTED, true); } + /// Returns a boolean indicating whether the meta's parent is connected. + pub fn is_parent_connected(&self) -> bool { + self.connected_flags + .contains(ConnectedFlags::PARENT_CONNECTED) + } + + /// Mark the meta's parent as connected. + /// If the meta is also full, the meta is now connected as well. Return a + /// boolean indicating whether the meta becamed connected from this call. + pub fn set_parent_connected(&mut self) -> bool { + // Already connected so nothing to do, bail early + if self.is_connected() { + return false; + } + + self.connected_flags + .set(ConnectedFlags::PARENT_CONNECTED, true); + + if self.is_full() { + self.set_connected(); + } + + self.is_connected() + } + /// Dangerous. Currently only needed for a local-cluster test pub fn unset_parent(&mut self) { self.parent_slot = None; @@ -275,7 +303,7 @@ impl SlotMeta { let connected_flags = if slot == 0 { // Slot 0 is the start, mark it as having its' parent connected // such that slot 0 becoming full will be updated as connected - ConnectedFlags::CONNECTED + ConnectedFlags::PARENT_CONNECTED } else { ConnectedFlags::default() }; @@ -461,7 +489,8 @@ mod test { #[test] fn test_slot_meta_slot_zero_connected() { let meta = SlotMeta::new(0 /* slot */, None /* parent */); - assert!(meta.is_connected()); + assert!(meta.is_parent_connected()); + assert!(!meta.is_connected()); } #[test] diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 552d88ae086055..b068ac7b3e01e4 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -738,7 +738,8 @@ pub fn process_blockstore_from_root( info!("Processing ledger from slot {}...", start_slot); let now = Instant::now(); - // ensure start_slot is rooted for correct replay + // Ensure start_slot is rooted for correct replay; also ensure start_slot and + // qualifying children are marked as connected if blockstore.is_primary_access() { blockstore .mark_slots_as_if_rooted_normally_at_startup( @@ -746,6 +747,9 @@ pub fn process_blockstore_from_root( true, ) .expect("Couldn't mark start_slot as root on startup"); + blockstore + .set_and_chain_connected_on_root_and_next_slots(bank.slot()) + .expect("Couldn't mark start_slot as connected during startup") } else { info!( "Starting slot {} isn't root and won't be updated due to being secondary blockstore access",