From 255b45774b6f797f6a3db4132d5218ef829b39e5 Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 20 Jul 2023 16:34:42 -0500 Subject: [PATCH] Re-enable periodic compaction on several columns (#32548) Periodic compaction was previously disabled on all columns in #27571 in favor of the delete_file_in_range() approach that #26651 introduced. However, several columns still rely on periodic compaction to reclaim storage. Namely, the TransactionStatus and AddressSignatures columns, as these columns contain a slot in their key, but as a non-primary index. The result of periodic compaction not running on these columns is that no storage space is being reclaimed from columns. This is obviously bad and would lead to a node eventually running of storage space and crashing. This PR reintroduces periodic compaction, but only for the columns that need it. (cherry picked from commit d73fa1b590b3c074770e127fadbfad89a0db7ffc) --- ledger/src/blockstore_db.rs | 123 +++++++++++++++++++++++++++--------- 1 file changed, 93 insertions(+), 30 deletions(-) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index f8fa9faaddcd28..14be3095b212d7 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -33,7 +33,7 @@ use { }, solana_storage_proto::convert::generated, std::{ - collections::{HashMap, HashSet}, + collections::HashMap, ffi::{CStr, CString}, fs, marker::PhantomData, @@ -51,6 +51,14 @@ const BLOCKSTORE_METRICS_ERROR: i64 = -1; const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB const FIFO_WRITE_BUFFER_SIZE: u64 = 2 * MAX_WRITE_BUFFER_SIZE; +// SST files older than this value will be picked up for compaction. This value +// was chosen to be one day to strike a balance between storage getting +// reclaimed in a timely manner and the additional I/O that compaction incurs. +// For more details on this property, see +// https://github.com/facebook/rocksdb/blob/749b179c041347d150fa6721992ae8398b7d2b39/ +// include/rocksdb/advanced_options.h#L908C30-L908C30 +const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24; + // Column family for metadata about a leader slot const META_CF: &str = "meta"; // Column family for slots that have been marked as dead @@ -361,9 +369,6 @@ impl Rocks { fs::create_dir_all(path)?; // Use default database options - if should_disable_auto_compactions(&access_type) { - info!("Disabling rocksdb's automatic compactions..."); - } let mut db_options = get_db_options(&access_type); if let Some(recovery_mode) = recovery_mode { db_options.set_wal_recovery_mode(recovery_mode.into()); @@ -407,6 +412,7 @@ impl Rocks { } } }; + db.configure_compaction(); Ok(db) } @@ -470,6 +476,53 @@ impl Rocks { ] } + // Configure compaction on a per-column basis + fn configure_compaction(&self) { + // If compactions are disabled altogether, no need to tune values + if should_disable_auto_compactions(&self.access_type) { + info!( + "Rocks's automatic compactions are disabled due to {:?} access", + self.access_type + ); + return; + } + + // Some columns make use of rocksdb's compaction to help in cleaning + // the database. See comments in should_enable_cf_compaction() for more + // details on why some columns need compaction and why others do not. + // + // More specifically, periodic (automatic) compaction is used as + // opposed to manual compaction requests on a range. + // - Periodic compaction operates on individual files once the file + // has reached a certain (configurable) age. See comments at + // PERIODIC_COMPACTION_SECONDS for some more deatil. + // - Manual compaction operates on a range and could end up propagating + // through several files and/or levels of the db. + // + // Given that data is inserted into the db at a somewhat steady rate, + // the age of the individual files will be fairly evently distributed + // over time as well. Thus, the I/O to perform cleanup with periodic + // compaction is also evenly distributed over time. On the other hand, + // a manual compaction spanning a large numbers of files could cause + // a sudden burst in I/O. Such a burst could potentially cause a write + // stall in addition to negatively impacting other parts of the system. + // Thus, the choice to use periodic compactions is fairly easy. + for cf_name in Self::columns() { + if should_enable_cf_compaction(cf_name) { + let cf_handle = self.cf_handle(cf_name); + self.db + .set_options_cf( + &cf_handle, + &[( + "periodic_compaction_seconds", + &PERIODIC_COMPACTION_SECONDS.to_string(), + )], + ) + .unwrap(); + } + } + } + fn destroy(path: &Path) -> Result<()> { DB::destroy(&Options::default(), path)?; @@ -1609,7 +1662,9 @@ impl<'a> WriteBatch<'a> { } } +/// A CompactionFilter implementation to remove keys older than a given slot. struct PurgedSlotFilter { + /// The oldest slot to keep; any slot < oldest_slot will be removed oldest_slot: Slot, name: CString, _phantom: PhantomData, @@ -1620,8 +1675,6 @@ impl CompactionFilter for PurgedSlotFilter { use rocksdb::CompactionDecision::*; let slot_in_key = C::slot(C::index(key)); - // Refer to a comment about periodic_compaction_seconds, especially regarding implicit - // periodic execution of compaction_filters if slot_in_key >= self.oldest_slot { Keep } else { @@ -1692,7 +1745,7 @@ fn get_cf_options( cf_options.set_disable_auto_compactions(true); } - if !disable_auto_compactions && !should_exclude_from_compaction(C::NAME) { + if !disable_auto_compactions && should_enable_cf_compaction(C::NAME) { cf_options.set_compaction_filter_factory(PurgedSlotFilterFactory:: { oldest_slot: oldest_slot.clone(), name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(), @@ -1836,25 +1889,36 @@ fn get_db_options(access_type: &AccessType) -> Options { options } -// Returns whether automatic compactions should be disabled based upon access type +// Returns whether automatic compactions should be disabled for the entire +// database based upon the given access type. fn should_disable_auto_compactions(access_type: &AccessType) -> bool { // Leave automatic compactions enabled (do not disable) in Primary mode; // disable in all other modes to prevent accidental cleaning !matches!(access_type, AccessType::Primary) } -// Returns whether the supplied column (name) should be excluded from compaction -fn should_exclude_from_compaction(cf_name: &str) -> bool { - // List of column families to be excluded from compactions - let no_compaction_cfs: HashSet<&'static str> = vec![ - columns::TransactionStatusIndex::NAME, - columns::ProgramCosts::NAME, - columns::TransactionMemos::NAME, - ] - .into_iter() - .collect(); - - no_compaction_cfs.get(cf_name).is_some() +// Returns whether compactions should be enabled for the given column (name). +fn should_enable_cf_compaction(cf_name: &str) -> bool { + // In order to keep the ledger storage footprint within a desired size, + // LedgerCleanupService removes data in FIFO order by slot. + // + // Several columns do not contain slot in their key. These columns must + // be manually managed to avoid unbounded storage growth. + // + // Columns where slot is the primary index can be efficiently cleaned via + // Database::delete_range_cf() && Database::delete_file_in_range_cf(). + // + // Columns where a slot is part of the key but not the primary index can + // not be range deleted like above. Instead, the individual key/value pairs + // must be iterated over and a decision to keep or discard that pair is + // made. The comparison logic is implemented in PurgedSlotFilter which is + // configured to run as part of rocksdb's automatic compactions. Storage + // space is reclaimed on this class of columns once compaction has + // completed on a given range or file. + matches!( + cf_name, + columns::TransactionStatus::NAME | columns::AddressSignatures::NAME + ) } // Returns true if the column family enables compression. @@ -1937,15 +2001,14 @@ pub mod tests { } #[test] - fn test_should_exclude_from_compaction() { - // currently there are three CFs excluded from compaction: - assert!(should_exclude_from_compaction( - columns::TransactionStatusIndex::NAME - )); - assert!(should_exclude_from_compaction(columns::ProgramCosts::NAME)); - assert!(should_exclude_from_compaction( - columns::TransactionMemos::NAME - )); - assert!(!should_exclude_from_compaction("something else")); + fn test_should_enable_cf_compaction() { + let columns_to_compact = vec![ + columns::TransactionStatus::NAME, + columns::AddressSignatures::NAME, + ]; + columns_to_compact.iter().for_each(|cf_name| { + assert!(should_enable_cf_compaction(cf_name)); + }); + assert!(!should_enable_cf_compaction("something else")); } }