diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 160b8721f45b6e..82d2a4685a16b3 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -138,6 +138,8 @@ impl LedgerCleanupService { root: Slot, max_ledger_shreds: u64, ) -> (bool, Slot, u64) { + let lowest_confirmed_slot = blockstore.get_lowest_confirmed_slot(); + let mut total_slots = Vec::new(); let mut iterate_time = Measure::start("iterate_time"); let mut total_shreds = 0; @@ -160,7 +162,7 @@ impl LedgerCleanupService { max_ledger_shreds, iterate_time ); - if (total_shreds as u64) < max_ledger_shreds { + if (total_shreds as u64) < max_ledger_shreds || lowest_confirmed_slot == 0 { return (false, 0, total_shreds); } let mut num_shreds_to_clean = 0; @@ -173,6 +175,17 @@ impl LedgerCleanupService { } } + // Only purge up to the lowest confirmed slot. + if lowest_cleanup_slot > lowest_confirmed_slot { + warn!( + "Unable to keep the ledger store within --limit_ledger_size \ + limit due to the replay progress. Consider increasing \ + --limit_ledger_size and the ledger disk size, or improving \ + hardware performance to speed up the replay." + ); + lowest_cleanup_slot = lowest_confirmed_slot; + } + (true, lowest_cleanup_slot, total_shreds) } @@ -343,6 +356,7 @@ impl LedgerCleanupService { } #[cfg(test)] mod tests { + const TEST_MAX_SLOT: u64 = u64::max_value(); use { super::*, crossbeam_channel::unbounded, @@ -354,6 +368,7 @@ mod tests { solana_logger::setup(); let blockstore_path = get_tmp_ledger_path!(); let blockstore = Blockstore::open(&blockstore_path).unwrap(); + blockstore.set_lowest_confirmed_slot(TEST_MAX_SLOT); let (shreds, _) = make_many_slot_entries(0, 50, 5); blockstore.insert_shreds(shreds, None, false).unwrap(); let blockstore = Arc::new(blockstore); @@ -397,6 +412,55 @@ mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + #[test] + fn test_cleanup_bound_to_confirmed_slot() { + solana_logger::setup(); + let blockstore_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + blockstore.set_lowest_confirmed_slot(30); + let (shreds, _) = make_many_slot_entries(0, 50, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let blockstore = Arc::new(blockstore); + let (sender, receiver) = unbounded(); + + //send a signal to kill all but 5 shreds, which will be in the newest slots + let mut last_purge_slot = 0; + let highest_compaction_slot = Arc::new(AtomicU64::new(0)); + sender.send(50).unwrap(); + LedgerCleanupService::cleanup_ledger( + &receiver, + &blockstore, + 5, + &mut last_purge_slot, + 10, + &highest_compaction_slot, + ) + .unwrap(); + assert_eq!(last_purge_slot, 50); + assert_eq!(highest_compaction_slot.load(Ordering::Relaxed), 30); + + //check that 0-40 don't exist + blockstore + .slot_meta_iterator(0) + .unwrap() + .for_each(|(slot, _)| assert!(slot > 30)); + + let mut last_compaction_slot = 0; + let mut jitter = 0; + LedgerCleanupService::compact_ledger( + &blockstore, + &mut last_compaction_slot, + 10, + &highest_compaction_slot, + &mut jitter, + None, + ); + assert_eq!(jitter, 0); + + drop(blockstore); + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + #[test] fn test_cleanup_speed() { solana_logger::setup(); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 44f2ef5c727d52..f8e5162a31a0d0 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1732,12 +1732,12 @@ impl ReplayStage { bank, &mut w_replay_stats, &mut w_replay_progress, - false, + false, // skip_verification transaction_status_sender, Some(replay_vote_sender), - None, + None, // entry_callback verify_recyclers, - false, + false, // allow_dead_slots log_messages_bytes_limit, prioritization_fee_cache, )?; diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index 809f35972d8ca6..f3425ee7d6b587 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -367,6 +367,9 @@ mod tests { }, ) .unwrap(); + // Set the lowest confirmed slot to max to make ledger cleanup service + // skips the check for replay progress during the test. + blockstore.set_lowest_confirmed_slot(std::u64::MAX); if config.no_compaction { blockstore.set_no_compaction(true); } @@ -665,6 +668,9 @@ mod tests { fn test_compaction() { let blockstore_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); + // Set the lowest confirmed slot to max to make ledger cleanup service + // skips the check for replay progress during the test. + blockstore.set_lowest_confirmed_slot(std::u64::MAX); let n = 10_000; let batch_size_slots = 100; diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 5dbc072f29d5bf..aa47d9912af4aa 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -176,6 +176,7 @@ pub struct Blockstore { completed_slots_senders: Mutex>, pub shred_timing_point_sender: Option, pub lowest_cleanup_slot: RwLock, + lowest_confirmed_slot: RwLock, no_compaction: bool, pub slots_stats: SlotsStats, } @@ -334,6 +335,7 @@ impl Blockstore { insert_shreds_lock: Mutex::<()>::default(), last_root, lowest_cleanup_slot: RwLock::::default(), + lowest_confirmed_slot: RwLock::::default(), no_compaction: false, slots_stats: SlotsStats::default(), }; @@ -2214,6 +2216,14 @@ impl Blockstore { (lowest_cleanup_slot, lowest_available_slot) } + pub fn set_lowest_confirmed_slot(&self, slot: Slot) { + *self.lowest_confirmed_slot.write().unwrap() = slot; + } + + pub fn get_lowest_confirmed_slot(&self) -> Slot { + *self.lowest_confirmed_slot.read().unwrap() + } + // Returns a transaction status, as well as a loop counter for unit testing fn get_transaction_status_with_counter( &self, diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 29660d2fae6601..120c60523766b0 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1120,7 +1120,7 @@ pub fn confirm_slot( load_result }?; - confirm_slot_entries( + let result = confirm_slot_entries( bank, slot_entries_load_result, timing, @@ -1132,7 +1132,13 @@ pub fn confirm_slot( recyclers, log_messages_bytes_limit, prioritization_fee_cache, - ) + ); + + if result.is_ok() { + blockstore.set_lowest_confirmed_slot(slot); + } + + result } #[allow(clippy::too_many_arguments)]