Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Delete files older than the lowest_cleanup_slot in LedgerCleanupServi…
Browse files Browse the repository at this point in the history
…ce::cleanup_ledger (#26651)

#### Problem
LedgerCleanupService requires compactions to propagate & digest range-delete tombstones
to eventually reclaim disk space.

#### Summary of Changes
This PR makes LedgerCleanupService::cleanup_ledger delete any file whose slot-range is
older than the lowest_cleanup_slot.  This allows us to reclaim disk space more often with
fewer IOps.  Experimental results on mainnet validators show that the PR can effectively
reduce 33% to 40% ledger disk size.
  • Loading branch information
yhchiang-sol authored Aug 8, 2022
1 parent 9b54b15 commit 99ef218
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 23 deletions.
34 changes: 12 additions & 22 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,25 @@ impl LedgerCleanupService {
/// A helper function to `cleanup_ledger` which returns a tuple of the
/// following four elements suggesting whether to clean up the ledger:
///
/// Return value (bool, Slot, Slot, u64):
/// Return value (bool, Slot, u64):
/// - `slots_to_clean` (bool): a boolean value indicating whether there
/// are any slots to clean. If true, then `cleanup_ledger` function
/// will then proceed with the ledger cleanup.
/// - `first_slot_to_purge` (Slot): the first slot to purge.
/// - `lowest_slot_to_puerge` (Slot): the lowest slot to purge. Together
/// with `first_slot_to_purge`, the two Slot values represent the
/// range of the clean up.
/// - `lowest_slot_to_purge` (Slot): the lowest slot to purge. Any
/// slot which is older or equal to `lowest_slot_to_purge` will be
/// cleaned up.
/// - `total_shreds` (u64): the total estimated number of shreds before the
/// `root`.
fn find_slots_to_clean(
blockstore: &Arc<Blockstore>,
root: Slot,
max_ledger_shreds: u64,
) -> (bool, Slot, Slot, u64) {
) -> (bool, Slot, u64) {
let mut total_slots = Vec::new();
let mut iterate_time = Measure::start("iterate_time");
let mut total_shreds = 0;
let mut first_slot = 0;
for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() {
if i == 0 {
first_slot = slot;
debug!("purge: searching from slot: {}", slot);
}
// Not exact since non-full slots will have holes
Expand All @@ -157,15 +154,14 @@ impl LedgerCleanupService {
}
iterate_time.stop();
info!(
"first_slot={} total_slots={} total_shreds={} max_ledger_shreds={}, {}",
first_slot,
"total_slots={} total_shreds={} max_ledger_shreds={}, {}",
total_slots.len(),
total_shreds,
max_ledger_shreds,
iterate_time
);
if (total_shreds as u64) < max_ledger_shreds {
return (false, 0, 0, total_shreds);
return (false, 0, total_shreds);
}
let mut num_shreds_to_clean = 0;
let mut lowest_cleanup_slot = total_slots[0].0;
Expand All @@ -177,7 +173,7 @@ impl LedgerCleanupService {
}
}

(true, first_slot, lowest_cleanup_slot, total_shreds)
(true, lowest_cleanup_slot, total_shreds)
}

fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
Expand Down Expand Up @@ -233,7 +229,7 @@ impl LedgerCleanupService {

*last_purge_slot = root;

let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) =
let (slots_to_clean, lowest_cleanup_slot, total_shreds) =
Self::find_slots_to_clean(blockstore, root, max_ledger_shreds);

if slots_to_clean {
Expand All @@ -248,18 +244,12 @@ impl LedgerCleanupService {
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
slot_update_time.stop();

info!(
"purging data from slots {} to {}",
purge_first_slot, lowest_cleanup_slot
);
info!("purging data older than {}", lowest_cleanup_slot);

let mut purge_time = Measure::start("purge_slots");

blockstore.purge_slots(
purge_first_slot,
lowest_cleanup_slot,
PurgeType::CompactionFilter,
);
// purge any slots older than lowest_cleanup_slot.
blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter);
// Update only after purge operation.
// Safety: This value can be used by compaction_filters shared via Arc<AtomicU64>.
// Compactions are async and run as a multi-threaded background job. However, this
Expand Down
94 changes: 93 additions & 1 deletion ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {super::*, solana_sdk::message::AccountKeys, std::time::Instant};
pub struct PurgeStats {
delete_range: u64,
write_batch: u64,
delete_files_in_range: u64,
}

#[derive(Clone, Copy)]
Expand Down Expand Up @@ -46,7 +47,12 @@ impl Blockstore {
("from_slot", from_slot as i64, i64),
("to_slot", to_slot as i64, i64),
("delete_range_us", purge_stats.delete_range as i64, i64),
("write_batch_us", purge_stats.write_batch as i64, i64)
("write_batch_us", purge_stats.write_batch as i64, i64),
(
"delete_files_in_range_us",
purge_stats.write_batch as i64,
i64
)
);
if let Err(e) = purge_result {
error!(
Expand Down Expand Up @@ -141,6 +147,9 @@ impl Blockstore {

/// A helper function to `purge_slots` that executes the ledger clean up
/// from `from_slot` to `to_slot`.
///
/// When `from_slot` is 0, any sst-file with a key-range completely older
/// than `to_slot` will also be deleted.
pub(crate) fn run_purge_with_stats(
&self,
from_slot: Slot,
Expand Down Expand Up @@ -239,6 +248,7 @@ impl Blockstore {
}
}
delete_range_timer.stop();

let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) {
error!(
Expand All @@ -248,15 +258,97 @@ impl Blockstore {
return Err(e);
}
write_timer.stop();

let mut purge_files_in_range_timer = Measure::start("delete_file_in_range");
// purge_files_in_range delete any files whose slot range is within
// [from_slot, to_slot]. When from_slot is 0, it is safe to run
// purge_files_in_range because if purge_files_in_range deletes any
// sst file that contains any range-deletion tombstone, the deletion
// range of that tombstone will be completely covered by the new
// range-delete tombstone (0, to_slot) issued above.
//
// On the other hand, purge_files_in_range is more effective and
// efficient than the compaction filter (which runs key-by-key)
// because all the sst files that have key range below to_slot
// can be deleted immediately.
if columns_purged && from_slot == 0 {
self.purge_files_in_range(from_slot, to_slot);
}
purge_files_in_range_timer.stop();

purge_stats.delete_range += delete_range_timer.as_us();
purge_stats.write_batch += write_timer.as_us();
purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us();

// only drop w_active_transaction_status_index after we do db.write(write_batch);
// otherwise, readers might be confused with inconsistent state between
// self.active_transaction_status_index and RockDb's TransactionStatusIndex contents
drop(w_active_transaction_status_index);
Ok(columns_purged)
}

fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool {
self.db
.delete_file_in_range_cf::<cf::SlotMeta>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::BankHash>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Root>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::ShredData>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::ShredCode>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::DeadSlots>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::DuplicateSlots>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::ErasureMeta>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Orphans>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Index>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Rewards>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Blocktime>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::PerfSamples>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::BlockHeight>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::OptimisticSlots>(from_slot, to_slot)
.is_ok()
}

pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
if self.no_compaction {
info!("compact_storage: compaction disabled");
Expand Down
21 changes: 21 additions & 0 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,16 @@ impl Rocks {
Ok(())
}

fn delete_file_in_range_cf(
&self,
cf: &ColumnFamily,
from_key: &[u8],
to_key: &[u8],
) -> Result<()> {
self.db.delete_file_in_range_cf(cf, from_key, to_key)?;
Ok(())
}

fn iterator_cf<C>(&self, cf: &ColumnFamily, iterator_mode: IteratorMode<C::Index>) -> DBIterator
where
C: Column,
Expand Down Expand Up @@ -1117,6 +1127,17 @@ impl Database {
batch.delete_range_cf::<C>(cf, from_index, to_index)
}

pub fn delete_file_in_range_cf<C>(&self, from: Slot, to: Slot) -> Result<()>
where
C: Column + ColumnName,
{
self.backend.delete_file_in_range_cf(
self.cf_handle::<C>(),
&C::key(C::as_index(from)),
&C::key(C::as_index(to)),
)
}

pub fn is_primary_access(&self) -> bool {
self.backend.is_primary_access()
}
Expand Down

0 comments on commit 99ef218

Please sign in to comment.