diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 9b111e2e44e7ad..607f9f82bd9dad 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -29,9 +29,6 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000; // and starve other blockstore users. pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; -// Delay between purges to cooperate with other blockstore users -pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500); - // Compacting at a slower interval than purging helps keep IOPS down. // Once a day should be ample const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT; @@ -67,7 +64,6 @@ impl LedgerCleanupService { max_ledger_shreds, &mut last_purge_slot, DEFAULT_PURGE_SLOT_INTERVAL, - Some(DEFAULT_DELAY_BETWEEN_PURGES), &mut last_compaction_slot, DEFAULT_COMPACTION_SLOT_INTERVAL, ) { @@ -142,7 +138,6 @@ impl LedgerCleanupService { max_ledger_shreds: u64, last_purge_slot: &mut u64, purge_interval: u64, - delay_between_purges: Option, last_compaction_slot: &mut u64, compaction_interval: u64, ) -> Result<(), RecvTimeoutError> { @@ -156,6 +151,7 @@ impl LedgerCleanupService { "purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}", root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre ); + *last_purge_slot = root; let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) = @@ -183,11 +179,10 @@ impl LedgerCleanupService { purge_first_slot, lowest_cleanup_slot ); - let mut purge_time = Measure::start("purge_slots_with_delay"); - blockstore.purge_slots_with_delay( + let mut purge_time = Measure::start("purge_slots"); + blockstore.purge_slots( purge_first_slot, lowest_cleanup_slot, - delay_between_purges, PurgeType::PrimaryIndex, ); purge_time.stop(); @@ -275,7 +270,6 @@ mod tests { 5, &mut last_purge_slot, 10, - None, &mut last_compaction_slot, 10, ) @@ -333,7 +327,6 @@ mod tests { initial_slots, &mut last_purge_slot, 10, - None, &mut last_compaction_slot, 10, ) diff --git a/core/src/validator.rs b/core/src/validator.rs index 74513eaf2c140a..76198ed358cb9c 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -684,7 +684,7 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi let end_slot = last_slot.unwrap(); info!("Purging slots {} to {}", start_slot, end_slot); - blockstore.purge_slots_with_delay(start_slot, end_slot, None, PurgeType::Exact); + blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact); blockstore.purge_from_next_slots(start_slot, end_slot); info!("Purging done, compacting db.."); if let Err(e) = blockstore.compact_storage(start_slot, end_slot) { diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index ab0029b2fe067f..191d9488a21448 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -382,7 +382,6 @@ mod tests { max_ledger_shreds, &mut last_purge_slot, 10, - None, &mut last_compaction_slot, 10, ) diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index ed12cdb32bc055..842273e4fbc4f8 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -1432,7 +1432,7 @@ fn main() { let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot); let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot); let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly); - blockstore.purge_slots(start_slot, end_slot); + blockstore.purge_and_compact_slots(start_slot, end_slot); blockstore.purge_from_next_slots(start_slot, end_slot); } ("list-roots", Some(arg_matches)) => { diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f7c195573ef88c..ec5ddcd4bd093c 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -6155,14 +6155,14 @@ pub mod tests { .insert_shreds(all_shreds, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, slot); + blockstore.purge_and_compact_slots(0, slot); // Test inserting just the codes, enough for recovery blockstore .insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, slot); + blockstore.purge_and_compact_slots(0, slot); // Test inserting some codes, but not enough for recovery blockstore @@ -6173,7 +6173,7 @@ pub mod tests { ) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, slot); + blockstore.purge_and_compact_slots(0, slot); // Test inserting just the codes, and some data, enough for recovery let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1] @@ -6185,7 +6185,7 @@ pub mod tests { .insert_shreds(shreds, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, slot); + blockstore.purge_and_compact_slots(0, slot); // Test inserting some codes, and some data, but enough for recovery let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1] @@ -6197,7 +6197,7 @@ pub mod tests { .insert_shreds(shreds, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, slot); + blockstore.purge_and_compact_slots(0, slot); // Test inserting all shreds in 2 rounds, make sure nothing is lost let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1] @@ -6217,7 +6217,7 @@ pub mod tests { .insert_shreds(shreds2, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, slot); + blockstore.purge_and_compact_slots(0, slot); // Test not all, but enough data and coding shreds in 2 rounds to trigger recovery, // make sure nothing is lost @@ -6242,7 +6242,7 @@ pub mod tests { .insert_shreds(shreds2, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, slot); + blockstore.purge_and_compact_slots(0, slot); // Test insert shreds in 2 rounds, but not enough to trigger // recovery, make sure nothing is lost @@ -6267,7 +6267,7 @@ pub mod tests { .insert_shreds(shreds2, Some(&leader_schedule_cache), false) .unwrap(); verify_index_integrity(&blockstore, slot); - blockstore.purge_slots(0, slot); + blockstore.purge_and_compact_slots(0, slot); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 9e9084f0b9538d..fdffb157b6cce0 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -1,5 +1,4 @@ use super::*; -use std::time::Instant; #[derive(Default)] pub struct PurgeStats { @@ -12,61 +11,28 @@ impl Blockstore { /// Dangerous; Use with care: /// Does not check for integrity and does not update slot metas that refer to deleted slots /// Modifies multiple column families simultaneously - pub fn purge_slots_with_delay( - &self, - from_slot: Slot, - to_slot: Slot, - delay_between_purges: Option, - purge_type: PurgeType, - ) { - // if there's no upper bound, split the purge request into batches of 1000 slots - const PURGE_BATCH_SIZE: u64 = 1000; - let mut batch_start = from_slot; + pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot, purge_type: PurgeType) { let mut purge_stats = PurgeStats::default(); - let mut last_datapoint = Instant::now(); - let mut datapoint_start = batch_start; - while batch_start < to_slot { - let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot); - - let purge_result = - self.run_purge_with_stats(batch_start, batch_end, purge_type, &mut purge_stats); - - if last_datapoint.elapsed().as_millis() > 1000 { - datapoint_info!( - "blockstore-purge", - ("from_slot", datapoint_start as i64, i64), - ("to_slot", batch_end as i64, i64), - ("delete_range_us", purge_stats.delete_range as i64, i64), - ("write_batch_us", purge_stats.write_batch as i64, i64) - ); - last_datapoint = Instant::now(); - purge_stats = PurgeStats::default(); - datapoint_start = batch_end; - } - - match purge_result { - Ok(_all_columns_purged) => { - batch_start = batch_end; + let purge_result = + self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut purge_stats); - if let Some(ref duration) = delay_between_purges { - // Cooperate with other blockstore users - std::thread::sleep(*duration); - } - } - Err(e) => { - error!( - "Error: {:?}; Purge failed in range {:?} to {:?}", - e, batch_start, batch_end - ); - break; - } - } + datapoint_info!( + "blockstore-purge", + ("from_slot", from_slot as i64, i64), + ("to_slot", to_slot as i64, i64), + ("delete_range_us", purge_stats.delete_range as i64, i64), + ("write_batch_us", purge_stats.write_batch as i64, i64) + ); + if let Err(e) = purge_result { + error!( + "Error: {:?}; Purge failed in range {:?} to {:?}", + e, from_slot, to_slot + ); } } - // TODO: rename purge_slots() to purge_and_compact_slots() - pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) { - self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact); + pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) { + self.purge_slots(from_slot, to_slot, PurgeType::Exact); if let Err(e) = self.compact_storage(from_slot, to_slot) { // This error is not fatal and indicates an internal error? error!( @@ -443,11 +409,11 @@ pub mod tests { let (shreds, _) = make_many_slot_entries(0, 50, 5); blockstore.insert_shreds(shreds, None, false).unwrap(); - blockstore.purge_slots(0, 5); + blockstore.purge_and_compact_slots(0, 5); test_all_empty_or_min(&blockstore, 6); - blockstore.purge_slots(0, 50); + blockstore.purge_and_compact_slots(0, 50); // min slot shouldn't matter, blockstore should be empty test_all_empty_or_min(&blockstore, 100); @@ -471,7 +437,7 @@ pub mod tests { let (shreds, _) = make_many_slot_entries(0, 5000, 10); blockstore.insert_shreds(shreds, None, false).unwrap(); - blockstore.purge_slots(0, 4999); + blockstore.purge_and_compact_slots(0, 4999); test_all_empty_or_min(&blockstore, 5000); diff --git a/ledger/tests/blockstore.rs b/ledger/tests/blockstore.rs index 4fa73002a6ed2e..50393832af0527 100644 --- a/ledger/tests/blockstore.rs +++ b/ledger/tests/blockstore.rs @@ -42,7 +42,7 @@ fn test_multiple_threads_insert_shred() { assert_eq!(meta0.next_slots, expected_next_slots); // Delete slots for next iteration - blockstore.purge_slots(0, num_threads + 1); + blockstore.purge_and_compact_slots(0, num_threads + 1); } // Cleanup