Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Remove ledger purge batching #10830

Merged
merged 1 commit into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000;
// and starve other blockstore users.
pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;

// Delay between purges to cooperate with other blockstore users
pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500);

// Compacting at a slower interval than purging helps keep IOPS down.
// Once a day should be ample
const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT;
Expand Down Expand Up @@ -67,7 +64,6 @@ impl LedgerCleanupService {
max_ledger_shreds,
&mut last_purge_slot,
DEFAULT_PURGE_SLOT_INTERVAL,
Some(DEFAULT_DELAY_BETWEEN_PURGES),
&mut last_compaction_slot,
DEFAULT_COMPACTION_SLOT_INTERVAL,
) {
Expand Down Expand Up @@ -142,7 +138,6 @@ impl LedgerCleanupService {
max_ledger_shreds: u64,
last_purge_slot: &mut u64,
purge_interval: u64,
delay_between_purges: Option<Duration>,
last_compaction_slot: &mut u64,
compaction_interval: u64,
) -> Result<(), RecvTimeoutError> {
Expand All @@ -156,6 +151,7 @@ impl LedgerCleanupService {
"purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}",
root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre
);

*last_purge_slot = root;

let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) =
Expand Down Expand Up @@ -183,11 +179,10 @@ impl LedgerCleanupService {
purge_first_slot, lowest_cleanup_slot
);

let mut purge_time = Measure::start("purge_slots_with_delay");
blockstore.purge_slots_with_delay(
let mut purge_time = Measure::start("purge_slots");
blockstore.purge_slots(
purge_first_slot,
lowest_cleanup_slot,
delay_between_purges,
PurgeType::PrimaryIndex,
);
purge_time.stop();
Expand Down Expand Up @@ -275,7 +270,6 @@ mod tests {
5,
&mut last_purge_slot,
10,
None,
&mut last_compaction_slot,
10,
)
Expand Down Expand Up @@ -333,7 +327,6 @@ mod tests {
initial_slots,
&mut last_purge_slot,
10,
None,
&mut last_compaction_slot,
10,
)
Expand Down
2 changes: 1 addition & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi

let end_slot = last_slot.unwrap();
info!("Purging slots {} to {}", start_slot, end_slot);
blockstore.purge_slots_with_delay(start_slot, end_slot, None, PurgeType::Exact);
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
blockstore.purge_from_next_slots(start_slot, end_slot);
info!("Purging done, compacting db..");
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
Expand Down
1 change: 0 additions & 1 deletion core/tests/ledger_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ mod tests {
max_ledger_shreds,
&mut last_purge_slot,
10,
None,
&mut last_compaction_slot,
10,
)
Expand Down
2 changes: 1 addition & 1 deletion ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,7 @@ fn main() {
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
let end_slot = value_t_or_exit!(arg_matches, "end_slot", Slot);
let blockstore = open_blockstore(&ledger_path, AccessType::PrimaryOnly);
blockstore.purge_slots(start_slot, end_slot);
blockstore.purge_and_compact_slots(start_slot, end_slot);
blockstore.purge_from_next_slots(start_slot, end_slot);
}
("list-roots", Some(arg_matches)) => {
Expand Down
16 changes: 8 additions & 8 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6155,14 +6155,14 @@ pub mod tests {
.insert_shreds(all_shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, slot);
blockstore.purge_and_compact_slots(0, slot);

// Test inserting just the codes, enough for recovery
blockstore
.insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, slot);
blockstore.purge_and_compact_slots(0, slot);

// Test inserting some codes, but not enough for recovery
blockstore
Expand All @@ -6173,7 +6173,7 @@ pub mod tests {
)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, slot);
blockstore.purge_and_compact_slots(0, slot);

// Test inserting just the codes, and some data, enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1]
Expand All @@ -6185,7 +6185,7 @@ pub mod tests {
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, slot);
blockstore.purge_and_compact_slots(0, slot);

// Test inserting some codes, and some data, but enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
Expand All @@ -6197,7 +6197,7 @@ pub mod tests {
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, slot);
blockstore.purge_and_compact_slots(0, slot);

// Test inserting all shreds in 2 rounds, make sure nothing is lost
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
Expand All @@ -6217,7 +6217,7 @@ pub mod tests {
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, slot);
blockstore.purge_and_compact_slots(0, slot);

// Test not all, but enough data and coding shreds in 2 rounds to trigger recovery,
// make sure nothing is lost
Expand All @@ -6242,7 +6242,7 @@ pub mod tests {
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, slot);
blockstore.purge_and_compact_slots(0, slot);

// Test insert shreds in 2 rounds, but not enough to trigger
// recovery, make sure nothing is lost
Expand All @@ -6267,7 +6267,7 @@ pub mod tests {
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_slots(0, slot);
blockstore.purge_and_compact_slots(0, slot);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
Expand Down
74 changes: 20 additions & 54 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::*;
use std::time::Instant;

#[derive(Default)]
pub struct PurgeStats {
Expand All @@ -12,61 +11,28 @@ impl Blockstore {
/// Dangerous; Use with care:
/// Does not check for integrity and does not update slot metas that refer to deleted slots
/// Modifies multiple column families simultaneously
pub fn purge_slots_with_delay(
&self,
from_slot: Slot,
to_slot: Slot,
delay_between_purges: Option<Duration>,
purge_type: PurgeType,
) {
// if there's no upper bound, split the purge request into batches of 1000 slots
const PURGE_BATCH_SIZE: u64 = 1000;
let mut batch_start = from_slot;
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot, purge_type: PurgeType) {
let mut purge_stats = PurgeStats::default();
let mut last_datapoint = Instant::now();
let mut datapoint_start = batch_start;
while batch_start < to_slot {
let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot);

let purge_result =
self.run_purge_with_stats(batch_start, batch_end, purge_type, &mut purge_stats);

if last_datapoint.elapsed().as_millis() > 1000 {
datapoint_info!(
"blockstore-purge",
("from_slot", datapoint_start as i64, i64),
("to_slot", batch_end as i64, i64),
("delete_range_us", purge_stats.delete_range as i64, i64),
("write_batch_us", purge_stats.write_batch as i64, i64)
);
last_datapoint = Instant::now();
purge_stats = PurgeStats::default();
datapoint_start = batch_end;
}

match purge_result {
Ok(_all_columns_purged) => {
batch_start = batch_end;
let purge_result =
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut purge_stats);

if let Some(ref duration) = delay_between_purges {
// Cooperate with other blockstore users
std::thread::sleep(*duration);
}
}
Err(e) => {
error!(
"Error: {:?}; Purge failed in range {:?} to {:?}",
e, batch_start, batch_end
);
break;
}
}
datapoint_info!(
"blockstore-purge",
("from_slot", from_slot as i64, i64),
("to_slot", to_slot as i64, i64),
("delete_range_us", purge_stats.delete_range as i64, i64),
("write_batch_us", purge_stats.write_batch as i64, i64)
);
if let Err(e) = purge_result {
error!(
"Error: {:?}; Purge failed in range {:?} to {:?}",
e, from_slot, to_slot
);
}
}

// TODO: rename purge_slots() to purge_and_compact_slots()
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact);
pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots(from_slot, to_slot, PurgeType::Exact);
if let Err(e) = self.compact_storage(from_slot, to_slot) {
// This error is not fatal and indicates an internal error?
error!(
Expand Down Expand Up @@ -443,11 +409,11 @@ pub mod tests {
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();

blockstore.purge_slots(0, 5);
blockstore.purge_and_compact_slots(0, 5);

test_all_empty_or_min(&blockstore, 6);

blockstore.purge_slots(0, 50);
blockstore.purge_and_compact_slots(0, 50);

// min slot shouldn't matter, blockstore should be empty
test_all_empty_or_min(&blockstore, 100);
Expand All @@ -471,7 +437,7 @@ pub mod tests {
let (shreds, _) = make_many_slot_entries(0, 5000, 10);
blockstore.insert_shreds(shreds, None, false).unwrap();

blockstore.purge_slots(0, 4999);
blockstore.purge_and_compact_slots(0, 4999);

test_all_empty_or_min(&blockstore, 5000);

Expand Down
2 changes: 1 addition & 1 deletion ledger/tests/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn test_multiple_threads_insert_shred() {
assert_eq!(meta0.next_slots, expected_next_slots);

// Delete slots for next iteration
blockstore.purge_slots(0, num_threads + 1);
blockstore.purge_and_compact_slots(0, num_threads + 1);
}

// Cleanup
Expand Down