diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 8b964e5b3ce9a1..8210a160d0b72d 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1111,9 +1111,8 @@ impl Blockstore { self.completed_slots_senders.lock().unwrap().clear(); } - /// Range-delete all entries which prefix matches the specified `slot`, - /// remove `slot` its' parents SlotMeta next_slots list, and - /// clear `slot`'s SlotMeta (except for next_slots). + /// Clear `slot` from the Blockstore, see ``Blockstore::purge_slot_cleanup_chaining` + /// for more details. /// /// This function currently requires `insert_shreds_lock`, as both /// `clear_unconfirmed_slot()` and `insert_shreds_handle_duplicate()` @@ -1121,6 +1120,7 @@ impl Blockstore { /// family. pub fn clear_unconfirmed_slot(&self, slot: Slot) { let _lock = self.insert_shreds_lock.lock().unwrap(); +<<<<<<< HEAD if let Some(mut slot_meta) = self .meta(slot) .expect("Couldn't fetch from SlotMeta column family") @@ -1152,9 +1152,21 @@ impl Blockstore { .expect("Couldn't insert into SlotMeta column family"); } else { error!( +======= + // Purge the slot and insert an empty `SlotMeta` with only the `next_slots` field preserved. + // Shreds inherently know their parent slot, and a parent's SlotMeta `next_slots` list + // will be updated when the child is inserted (see `Blockstore::handle_chaining()`). + // However, we are only purging and repairing the parent slot here. Since the child will not be + // reinserted the chaining will be lost. In order for bank forks discovery to ingest the child, + // we must retain the chain by preserving `next_slots`. + match self.purge_slot_cleanup_chaining(slot) { + Ok(_) => {} + Err(BlockstoreError::SlotUnavailable) => error!( +>>>>>>> cc4072bce8 (blockstore: atomize slot clearing, relax parent slot meta check (#35124)) "clear_unconfirmed_slot() called on slot {} with no SlotMeta", slot - ); + ), + Err(e) => panic!("Purge database operations failed {}", e), } } diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 92f9453eabb6ed..5f3406ff7e903e 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -129,6 +129,7 @@ impl Blockstore { } } + #[cfg(test)] pub(crate) fn run_purge( &self, from_slot: Slot, @@ -138,11 +139,60 @@ impl Blockstore { self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default()) } + /// Purges all columns relating to `slot`. + /// + /// Additionally, we cleanup the parent of `slot` by clearing `slot` from + /// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot` + /// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is + /// replayable upon repair of `slot`. + pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result { + let Some(mut slot_meta) = self.meta(slot)? else { + return Err(BlockstoreError::SlotUnavailable); + }; + let mut write_batch = self.db.batch()?; + + let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?; + + if let Some(parent_slot) = slot_meta.parent_slot { + let parent_slot_meta = self.meta(parent_slot)?; + if let Some(mut parent_slot_meta) = parent_slot_meta { + // .retain() is a linear scan; however, next_slots should + // only contain several elements so this isn't so bad + parent_slot_meta + .next_slots + .retain(|&next_slot| next_slot != slot); + write_batch.put::(parent_slot, &parent_slot_meta)?; + } else { + error!( + "Parent slot meta {} for child {} is missing or cleaned up. + Falling back to orphan repair to remedy the situation", + parent_slot, slot + ); + } + } + + // Retain a SlotMeta for `slot` with the `next_slots` field retained + slot_meta.clear_unconfirmed_slot(); + write_batch.put::(slot, &slot_meta)?; + + self.db.write(write_batch).inspect_err(|e| { + error!( + "Error: {:?} while submitting write batch for slot {:?}", + e, slot + ) + })?; + Ok(columns_purged) + } + /// A helper function to `purge_slots` that executes the ledger clean up. /// The cleanup applies to \[`from_slot`, `to_slot`\]. /// /// When `from_slot` is 0, any sst-file with a key-range completely older /// than `to_slot` will also be deleted. + /// + /// Note: slots > `to_slot` that chained to a purged slot are not properly + /// cleaned up. This function is not intended to be used if such slots need + /// to be replayed. pub(crate) fn run_purge_with_stats( &self, from_slot: Slot, @@ -150,11 +200,10 @@ impl Blockstore { purge_type: PurgeType, purge_stats: &mut PurgeStats, ) -> Result { - let mut write_batch = self - .db - .batch() - .expect("Database Error: Failed to get write batch"); + let mut write_batch = self.db.batch()?; + let mut delete_range_timer = Measure::start("delete_range"); +<<<<<<< HEAD let mut columns_purged = self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) @@ -241,16 +290,18 @@ impl Blockstore { // in no spiky periodic huge delete_range for them. } } +======= + let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?; +>>>>>>> cc4072bce8 (blockstore: atomize slot clearing, relax parent slot meta check (#35124)) delete_range_timer.stop(); let mut write_timer = Measure::start("write_batch"); - if let Err(e) = self.db.write(write_batch) { + self.db.write(write_batch).inspect(|e| { error!( - "Error: {:?} while submitting write batch for slot {:?} retrying...", - e, from_slot - ); - return Err(e); - } + "Error: {:?} while submitting write batch for purge from_slot {} to_slot {}", + e, from_slot, to_slot + ) + })?; write_timer.stop(); let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); @@ -281,6 +332,93 @@ impl Blockstore { Ok(columns_purged) } + fn purge_range( + &self, + write_batch: &mut WriteBatch, + from_slot: Slot, + to_slot: Slot, + purge_type: PurgeType, + ) -> Result { + let columns_purged = self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok(); + + match purge_type { + PurgeType::Exact => { + self.purge_special_columns_exact(write_batch, from_slot, to_slot)?; + } + PurgeType::CompactionFilter => { + // No explicit action is required here because this purge type completely and + // indefinitely relies on the proper working of compaction filter for those + // special column families, never toggling the primary index from the current + // one. Overall, this enables well uniformly distributed writes, resulting + // in no spiky periodic huge delete_range for them. + } + } + Ok(columns_purged) + } + fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool { self.db .delete_file_in_range_cf::(from_slot, to_slot) @@ -1198,4 +1336,234 @@ pub mod tests { .purge_special_columns_exact(&mut write_batch, slot, slot + 1) .unwrap(); } +<<<<<<< HEAD +======= + + #[test] + fn test_purge_special_columns_compaction_filter() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + let max_slot = 19; + + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); + let first_index = { + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::Start) + .unwrap(); + status_entry_iterator.next().unwrap().0 + }; + let last_index = { + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::End) + .unwrap(); + status_entry_iterator.next().unwrap().0 + }; + + let oldest_slot = 3; + blockstore.db.set_oldest_slot(oldest_slot); + blockstore.db.compact_range_cf::( + &cf::TransactionStatus::key(first_index), + &cf::TransactionStatus::key(last_index), + ); + + let status_entry_iterator = blockstore + .db + .iter::(IteratorMode::Start) + .unwrap(); + let mut count = 0; + for ((_signature, slot), _value) in status_entry_iterator { + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, max_slot - (oldest_slot - 1)); + + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); + let first_index = { + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::Start) + .unwrap(); + status_entry_iterator.next().unwrap().0 + }; + let last_index = { + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::End) + .unwrap(); + status_entry_iterator.next().unwrap().0 + }; + + let oldest_slot = 12; + blockstore.db.set_oldest_slot(oldest_slot); + blockstore.db.compact_range_cf::( + &cf::TransactionStatus::key(first_index), + &cf::TransactionStatus::key(last_index), + ); + + let status_entry_iterator = blockstore + .db + .iter::(IteratorMode::Start) + .unwrap(); + let mut count = 0; + for ((_signature, slot), _value) in status_entry_iterator { + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, max_slot - (oldest_slot - 1)); + } + + #[test] + fn test_purge_transaction_memos_compaction_filter() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + let oldest_slot = 5; + + fn random_signature() -> Signature { + use rand::Rng; + + let mut key = [0u8; 64]; + rand::thread_rng().fill(&mut key[..]); + Signature::from(key) + } + + // Insert some deprecated TransactionMemos + blockstore + .transaction_memos_cf + .put_deprecated(random_signature(), &"this is a memo".to_string()) + .unwrap(); + blockstore + .transaction_memos_cf + .put_deprecated(random_signature(), &"another memo".to_string()) + .unwrap(); + // Set clean_slot_0 to false, since we have deprecated memos + blockstore.db.set_clean_slot_0(false); + + // Insert some current TransactionMemos + blockstore + .transaction_memos_cf + .put( + (random_signature(), oldest_slot - 1), + &"this is a new memo in slot 4".to_string(), + ) + .unwrap(); + blockstore + .transaction_memos_cf + .put( + (random_signature(), oldest_slot), + &"this is a memo in slot 5 ".to_string(), + ) + .unwrap(); + + let first_index = { + let mut memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + memos_iterator.next().unwrap().unwrap().0 + }; + let last_index = { + let mut memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::End); + memos_iterator.next().unwrap().unwrap().0 + }; + + // Purge at slot 0 should not affect any memos + blockstore.db.set_oldest_slot(0); + blockstore + .db + .compact_range_cf::(&first_index, &last_index); + let memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for item in memos_iterator { + let _item = item.unwrap(); + count += 1; + } + assert_eq!(count, 4); + + // Purge at oldest_slot without clean_slot_0 only purges the current memo at slot 4 + blockstore.db.set_oldest_slot(oldest_slot); + blockstore + .db + .compact_range_cf::(&first_index, &last_index); + let memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for item in memos_iterator { + let (key, _value) = item.unwrap(); + let slot = ::index(&key).1; + assert!(slot == 0 || slot >= oldest_slot); + count += 1; + } + assert_eq!(count, 3); + + // Purge at oldest_slot with clean_slot_0 purges deprecated memos + blockstore.db.set_clean_slot_0(true); + blockstore + .db + .compact_range_cf::(&first_index, &last_index); + let memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for item in memos_iterator { + let (key, _value) = item.unwrap(); + let slot = ::index(&key).1; + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, 1); + } + + #[test] + fn test_purge_slot_cleanup_chaining_missing_slot_meta() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + assert!(matches!( + blockstore.purge_slot_cleanup_chaining(11).unwrap_err(), + BlockstoreError::SlotUnavailable + )); + } + + #[test] + fn test_purge_slot_cleanup_chaining() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (slot_11, _) = make_slot_entries(11, 4, 5, true); + blockstore.insert_shreds(slot_11, None, false).unwrap(); + let (slot_12, _) = make_slot_entries(12, 5, 5, true); + blockstore.insert_shreds(slot_12, None, false).unwrap(); + + blockstore.purge_slot_cleanup_chaining(5).unwrap(); + + let slot_meta = blockstore.meta(5).unwrap().unwrap(); + let expected_slot_meta = SlotMeta { + slot: 5, + // Only the next_slots should be preserved + next_slots: vec![6, 12], + ..SlotMeta::default() + }; + assert_eq!(slot_meta, expected_slot_meta); + + let parent_slot_meta = blockstore.meta(4).unwrap().unwrap(); + assert_eq!(parent_slot_meta.next_slots, vec![11]); + + let child_slot_meta = blockstore.meta(6).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + + let child_slot_meta = blockstore.meta(12).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + } +>>>>>>> cc4072bce8 (blockstore: atomize slot clearing, relax parent slot meta check (#35124)) }