Skip to content

Commit

Permalink
v1.16: Re-enable periodic compaction on several columns (backport of #…
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jul 21, 2023
1 parent 5de4cd2 commit f150ab7
Showing 1 changed file with 93 additions and 30 deletions.
123 changes: 93 additions & 30 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
},
solana_storage_proto::convert::generated,
std::{
collections::{HashMap, HashSet},
collections::HashMap,
ffi::{CStr, CString},
fs,
marker::PhantomData,
Expand All @@ -51,6 +51,14 @@ const BLOCKSTORE_METRICS_ERROR: i64 = -1;
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
const FIFO_WRITE_BUFFER_SIZE: u64 = 2 * MAX_WRITE_BUFFER_SIZE;

// SST files older than this value will be picked up for compaction. This value
// was chosen to be one day to strike a balance between storage getting
// reclaimed in a timely manner and the additional I/O that compaction incurs.
// For more details on this property, see
// https://github.com/facebook/rocksdb/blob/749b179c041347d150fa6721992ae8398b7d2b39/
// include/rocksdb/advanced_options.h#L908C30-L908C30
const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24;

// Column family for metadata about a leader slot
const META_CF: &str = "meta";
// Column family for slots that have been marked as dead
Expand Down Expand Up @@ -361,9 +369,6 @@ impl Rocks {
fs::create_dir_all(path)?;

// Use default database options
if should_disable_auto_compactions(&access_type) {
info!("Disabling rocksdb's automatic compactions...");
}
let mut db_options = get_db_options(&access_type);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
Expand Down Expand Up @@ -407,6 +412,7 @@ impl Rocks {
}
}
};
db.configure_compaction();

Ok(db)
}
Expand Down Expand Up @@ -470,6 +476,53 @@ impl Rocks {
]
}

// Configure compaction on a per-column basis
fn configure_compaction(&self) {
// If compactions are disabled altogether, no need to tune values
if should_disable_auto_compactions(&self.access_type) {
info!(
"Rocks's automatic compactions are disabled due to {:?} access",
self.access_type
);
return;
}

// Some columns make use of rocksdb's compaction to help in cleaning
// the database. See comments in should_enable_cf_compaction() for more
// details on why some columns need compaction and why others do not.
//
// More specifically, periodic (automatic) compaction is used as
// opposed to manual compaction requests on a range.
// - Periodic compaction operates on individual files once the file
// has reached a certain (configurable) age. See comments at
// PERIODIC_COMPACTION_SECONDS for some more deatil.
// - Manual compaction operates on a range and could end up propagating
// through several files and/or levels of the db.
//
// Given that data is inserted into the db at a somewhat steady rate,
// the age of the individual files will be fairly evently distributed
// over time as well. Thus, the I/O to perform cleanup with periodic
// compaction is also evenly distributed over time. On the other hand,
// a manual compaction spanning a large numbers of files could cause
// a sudden burst in I/O. Such a burst could potentially cause a write
// stall in addition to negatively impacting other parts of the system.
// Thus, the choice to use periodic compactions is fairly easy.
for cf_name in Self::columns() {
if should_enable_cf_compaction(cf_name) {
let cf_handle = self.cf_handle(cf_name);
self.db
.set_options_cf(
&cf_handle,
&[(
"periodic_compaction_seconds",
&PERIODIC_COMPACTION_SECONDS.to_string(),
)],
)
.unwrap();
}
}
}

fn destroy(path: &Path) -> Result<()> {
DB::destroy(&Options::default(), path)?;

Expand Down Expand Up @@ -1609,7 +1662,9 @@ impl<'a> WriteBatch<'a> {
}
}

/// A CompactionFilter implementation to remove keys older than a given slot.
struct PurgedSlotFilter<C: Column + ColumnName> {
/// The oldest slot to keep; any slot < oldest_slot will be removed
oldest_slot: Slot,
name: CString,
_phantom: PhantomData<C>,
Expand All @@ -1620,8 +1675,6 @@ impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
use rocksdb::CompactionDecision::*;

let slot_in_key = C::slot(C::index(key));
// Refer to a comment about periodic_compaction_seconds, especially regarding implicit
// periodic execution of compaction_filters
if slot_in_key >= self.oldest_slot {
Keep
} else {
Expand Down Expand Up @@ -1692,7 +1745,7 @@ fn get_cf_options<C: 'static + Column + ColumnName>(
cf_options.set_disable_auto_compactions(true);
}

if !disable_auto_compactions && !should_exclude_from_compaction(C::NAME) {
if !disable_auto_compactions && should_enable_cf_compaction(C::NAME) {
cf_options.set_compaction_filter_factory(PurgedSlotFilterFactory::<C> {
oldest_slot: oldest_slot.clone(),
name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(),
Expand Down Expand Up @@ -1836,25 +1889,36 @@ fn get_db_options(access_type: &AccessType) -> Options {
options
}

// Returns whether automatic compactions should be disabled based upon access type
// Returns whether automatic compactions should be disabled for the entire
// database based upon the given access type.
fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
// Leave automatic compactions enabled (do not disable) in Primary mode;
// disable in all other modes to prevent accidental cleaning
!matches!(access_type, AccessType::Primary)
}

// Returns whether the supplied column (name) should be excluded from compaction
fn should_exclude_from_compaction(cf_name: &str) -> bool {
// List of column families to be excluded from compactions
let no_compaction_cfs: HashSet<&'static str> = vec![
columns::TransactionStatusIndex::NAME,
columns::ProgramCosts::NAME,
columns::TransactionMemos::NAME,
]
.into_iter()
.collect();

no_compaction_cfs.get(cf_name).is_some()
// Returns whether compactions should be enabled for the given column (name).
fn should_enable_cf_compaction(cf_name: &str) -> bool {
// In order to keep the ledger storage footprint within a desired size,
// LedgerCleanupService removes data in FIFO order by slot.
//
// Several columns do not contain slot in their key. These columns must
// be manually managed to avoid unbounded storage growth.
//
// Columns where slot is the primary index can be efficiently cleaned via
// Database::delete_range_cf() && Database::delete_file_in_range_cf().
//
// Columns where a slot is part of the key but not the primary index can
// not be range deleted like above. Instead, the individual key/value pairs
// must be iterated over and a decision to keep or discard that pair is
// made. The comparison logic is implemented in PurgedSlotFilter which is
// configured to run as part of rocksdb's automatic compactions. Storage
// space is reclaimed on this class of columns once compaction has
// completed on a given range or file.
matches!(
cf_name,
columns::TransactionStatus::NAME | columns::AddressSignatures::NAME
)
}

// Returns true if the column family enables compression.
Expand Down Expand Up @@ -1937,15 +2001,14 @@ pub mod tests {
}

#[test]
fn test_should_exclude_from_compaction() {
// currently there are three CFs excluded from compaction:
assert!(should_exclude_from_compaction(
columns::TransactionStatusIndex::NAME
));
assert!(should_exclude_from_compaction(columns::ProgramCosts::NAME));
assert!(should_exclude_from_compaction(
columns::TransactionMemos::NAME
));
assert!(!should_exclude_from_compaction("something else"));
fn test_should_enable_cf_compaction() {
let columns_to_compact = vec![
columns::TransactionStatus::NAME,
columns::AddressSignatures::NAME,
];
columns_to_compact.iter().for_each(|cf_name| {
assert!(should_enable_cf_compaction(cf_name));
});
assert!(!should_enable_cf_compaction("something else"));
}
}

0 comments on commit f150ab7

Please sign in to comment.