Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ledger cleanup service honor the replay progress. #27498

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ impl LedgerCleanupService {
root: Slot,
max_ledger_shreds: u64,
) -> (bool, Slot, u64) {
let lowest_confirmed_slot = blockstore.get_lowest_confirmed_slot();

let mut total_slots = Vec::new();
let mut iterate_time = Measure::start("iterate_time");
let mut total_shreds = 0;
Expand All @@ -160,7 +162,7 @@ impl LedgerCleanupService {
max_ledger_shreds,
iterate_time
);
if (total_shreds as u64) < max_ledger_shreds {
if (total_shreds as u64) < max_ledger_shreds || lowest_confirmed_slot == 0 {
return (false, 0, total_shreds);
}
let mut num_shreds_to_clean = 0;
Expand All @@ -173,6 +175,17 @@ impl LedgerCleanupService {
}
}

// Only purge up to the lowest confirmed slot.
if lowest_cleanup_slot > lowest_confirmed_slot {
warn!(
"Unable to keep the ledger store within --limit_ledger_size \
limit due to the replay progress. Consider increasing \
--limit_ledger_size and the ledger disk size, or improving \
hardware performance to speed up the replay."
);
lowest_cleanup_slot = lowest_confirmed_slot;
}

(true, lowest_cleanup_slot, total_shreds)
}

Expand Down Expand Up @@ -343,6 +356,7 @@ impl LedgerCleanupService {
}
#[cfg(test)]
mod tests {
const TEST_MAX_SLOT: u64 = u64::max_value();
use {
super::*,
crossbeam_channel::unbounded,
Expand All @@ -354,6 +368,7 @@ mod tests {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.set_lowest_confirmed_slot(TEST_MAX_SLOT);
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
let blockstore = Arc::new(blockstore);
Expand Down Expand Up @@ -397,6 +412,55 @@ mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}

#[test]
fn test_cleanup_bound_to_confirmed_slot() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.set_lowest_confirmed_slot(30);
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
let blockstore = Arc::new(blockstore);
let (sender, receiver) = unbounded();

//send a signal to kill all but 5 shreds, which will be in the newest slots
let mut last_purge_slot = 0;
let highest_compaction_slot = Arc::new(AtomicU64::new(0));
sender.send(50).unwrap();
LedgerCleanupService::cleanup_ledger(
&receiver,
&blockstore,
5,
&mut last_purge_slot,
10,
&highest_compaction_slot,
)
.unwrap();
assert_eq!(last_purge_slot, 50);
assert_eq!(highest_compaction_slot.load(Ordering::Relaxed), 30);

//check that 0-40 don't exist
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, _)| assert!(slot > 30));

let mut last_compaction_slot = 0;
let mut jitter = 0;
LedgerCleanupService::compact_ledger(
&blockstore,
&mut last_compaction_slot,
10,
&highest_compaction_slot,
&mut jitter,
None,
);
assert_eq!(jitter, 0);

drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}

#[test]
fn test_cleanup_speed() {
solana_logger::setup();
Expand Down
6 changes: 3 additions & 3 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1732,12 +1732,12 @@ impl ReplayStage {
bank,
&mut w_replay_stats,
&mut w_replay_progress,
false,
false, // skip_verification
transaction_status_sender,
Some(replay_vote_sender),
None,
None, // entry_callback
verify_recyclers,
false,
false, // allow_dead_slots
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
Expand Down
6 changes: 6 additions & 0 deletions core/tests/ledger_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ mod tests {
},
)
.unwrap();
// Set the lowest confirmed slot to max to make ledger cleanup service
// skips the check for replay progress during the test.
blockstore.set_lowest_confirmed_slot(std::u64::MAX);
if config.no_compaction {
blockstore.set_no_compaction(true);
}
Expand Down Expand Up @@ -665,6 +668,9 @@ mod tests {
fn test_compaction() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
// Set the lowest confirmed slot to max to make ledger cleanup service
// skips the check for replay progress during the test.
blockstore.set_lowest_confirmed_slot(std::u64::MAX);

let n = 10_000;
let batch_size_slots = 100;
Expand Down
10 changes: 10 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub struct Blockstore {
completed_slots_senders: Mutex<Vec<CompletedSlotsSender>>,
pub shred_timing_point_sender: Option<PohTimingSender>,
pub lowest_cleanup_slot: RwLock<Slot>,
lowest_confirmed_slot: RwLock<Slot>,
no_compaction: bool,
pub slots_stats: SlotsStats,
}
Expand Down Expand Up @@ -334,6 +335,7 @@ impl Blockstore {
insert_shreds_lock: Mutex::<()>::default(),
last_root,
lowest_cleanup_slot: RwLock::<Slot>::default(),
lowest_confirmed_slot: RwLock::<Slot>::default(),
no_compaction: false,
slots_stats: SlotsStats::default(),
};
Expand Down Expand Up @@ -2214,6 +2216,14 @@ impl Blockstore {
(lowest_cleanup_slot, lowest_available_slot)
}

pub fn set_lowest_confirmed_slot(&self, slot: Slot) {
*self.lowest_confirmed_slot.write().unwrap() = slot;
}

pub fn get_lowest_confirmed_slot(&self) -> Slot {
*self.lowest_confirmed_slot.read().unwrap()
}

// Returns a transaction status, as well as a loop counter for unit testing
fn get_transaction_status_with_counter(
&self,
Expand Down
10 changes: 8 additions & 2 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ pub fn confirm_slot(
load_result
}?;

confirm_slot_entries(
let result = confirm_slot_entries(
bank,
slot_entries_load_result,
timing,
Expand All @@ -1132,7 +1132,13 @@ pub fn confirm_slot(
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
)
);

if result.is_ok() {
blockstore.set_lowest_confirmed_slot(slot);
}

result
}

#[allow(clippy::too_many_arguments)]
Expand Down