Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockstore: atomize slot clearing, relax parent slot meta check #35124

Merged
merged 8 commits into from
Mar 3, 2024
48 changes: 13 additions & 35 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,50 +1154,28 @@ impl Blockstore {
self.completed_slots_senders.lock().unwrap().clear();
}

/// Range-delete all entries which prefix matches the specified `slot`,
/// remove `slot` its' parents SlotMeta next_slots list, and
/// clear `slot`'s SlotMeta (except for next_slots).
/// Clear `slot` from the Blockstore, see ``Blockstore::purge_slot_cleanup_chaining`
/// for more details.
///
/// This function currently requires `insert_shreds_lock`, as both
/// `clear_unconfirmed_slot()` and `insert_shreds_handle_duplicate()`
/// try to perform read-modify-write operation on [`cf::SlotMeta`] column
/// family.
pub fn clear_unconfirmed_slot(&self, slot: Slot) {
let _lock = self.insert_shreds_lock.lock().unwrap();
if let Some(mut slot_meta) = self
.meta(slot)
.expect("Couldn't fetch from SlotMeta column family")
{
// Clear all slot related information
self.run_purge(slot, slot, PurgeType::Exact)
.expect("Purge database operations failed");

// Clear this slot as a next slot from parent
if let Some(parent_slot) = slot_meta.parent_slot {
let mut parent_slot_meta = self
.meta(parent_slot)
.expect("Couldn't fetch from SlotMeta column family")
.expect("Unconfirmed slot should have had parent slot set");
// .retain() is a linear scan; however, next_slots should
// only contain several elements so this isn't so bad
parent_slot_meta
.next_slots
.retain(|&next_slot| next_slot != slot);
self.meta_cf
.put(parent_slot, &parent_slot_meta)
.expect("Couldn't insert into SlotMeta column family");
}
// Reinsert parts of `slot_meta` that are important to retain, like the `next_slots`
// field.
slot_meta.clear_unconfirmed_slot();
self.meta_cf
.put(slot, &slot_meta)
.expect("Couldn't insert into SlotMeta column family");
} else {
error!(
// Purge the slot and insert an empty `SlotMeta` with only the `next_slots` field preserved.
// Shreds inherently know their parent slot, and a parent's SlotMeta `next_slots` list
// will be updated when the child is inserted (see `Blockstore::handle_chaining()`).
// However, we are only purging and repairing the parent slot here. Since the child will not be
// reinserted the chaining will be lost. In order for bank forks discovery to ingest the child,
// we must retain the chain by preserving `next_slots`.
match self.purge_slot_cleanup_chaining(slot) {
Ok(_) => {}
Err(BlockstoreError::SlotUnavailable) => error!(
"clear_unconfirmed_slot() called on slot {} with no SlotMeta",
slot
);
),
Err(e) => panic!("Purge database operations failed {}", e),
}
}

Expand Down
214 changes: 160 additions & 54 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl Blockstore {
}
}

#[cfg(test)]
steviez marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn run_purge(
&self,
from_slot: Slot,
Expand All @@ -144,90 +145,181 @@ impl Blockstore {
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default())
}

/// Purges all columns relating to `slot`.
///
/// Additionally, we cleanup the parent of `slot` by clearing `slot` from
/// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot`
/// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is
/// replayable upon repair of `slot`.
pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result<bool> {
let Some(mut slot_meta) = self.meta(slot)? else {
return Err(BlockstoreError::SlotUnavailable);
};
let mut write_batch = self.db.batch()?;

let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?;

if let Some(parent_slot) = slot_meta.parent_slot {
let parent_slot_meta = self.meta(parent_slot)?;
if let Some(mut parent_slot_meta) = parent_slot_meta {
// .retain() is a linear scan; however, next_slots should
// only contain several elements so this isn't so bad
parent_slot_meta
.next_slots
.retain(|&next_slot| next_slot != slot);
write_batch.put::<cf::SlotMeta>(parent_slot, &parent_slot_meta)?;
} else {
error!(
"Parent slot meta {} for child {} is missing or cleaned up.
Falling back to orphan repair to remedy the situation",
parent_slot, slot
);
}
}

// Retain a SlotMeta for `slot` with the `next_slots` field retained
slot_meta.clear_unconfirmed_slot();
write_batch.put::<cf::SlotMeta>(slot, &slot_meta)?;

self.db.write(write_batch).inspect_err(|e| {
error!(
"Error: {:?} while submitting write batch for slot {:?}",
e, slot
)
})?;
Ok(columns_purged)
}

/// A helper function to `purge_slots` that executes the ledger clean up.
/// The cleanup applies to \[`from_slot`, `to_slot`\].
///
/// When `from_slot` is 0, any sst-file with a key-range completely older
/// than `to_slot` will also be deleted.
///
/// Note: slots > `to_slot` that chained to a purged slot are not properly
/// cleaned up. This function is not intended to be used if such slots need
/// to be replayed.
pub(crate) fn run_purge_with_stats(
&self,
from_slot: Slot,
to_slot: Slot,
purge_type: PurgeType,
purge_stats: &mut PurgeStats,
) -> Result<bool> {
let mut write_batch = self
.db
.batch()
.expect("Database Error: Failed to get write batch");
let mut write_batch = self.db.batch()?;

steviez marked this conversation as resolved.
Show resolved Hide resolved
let mut delete_range_timer = Measure::start("delete_range");
let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?;
delete_range_timer.stop();

let mut write_timer = Measure::start("write_batch");
self.db.write(write_batch).inspect(|e| {
error!(
"Error: {:?} while submitting write batch for purge from_slot {} to_slot {}",
e, from_slot, to_slot
)
})?;
write_timer.stop();

let mut purge_files_in_range_timer = Measure::start("delete_file_in_range");
// purge_files_in_range delete any files whose slot range is within
// [from_slot, to_slot]. When from_slot is 0, it is safe to run
// purge_files_in_range because if purge_files_in_range deletes any
// sst file that contains any range-deletion tombstone, the deletion
// range of that tombstone will be completely covered by the new
// range-delete tombstone (0, to_slot) issued above.
//
// On the other hand, purge_files_in_range is more effective and
// efficient than the compaction filter (which runs key-by-key)
// because all the sst files that have key range below to_slot
// can be deleted immediately.
if columns_purged && from_slot == 0 {
self.purge_files_in_range(from_slot, to_slot);
}
purge_files_in_range_timer.stop();

purge_stats.delete_range += delete_range_timer.as_us();
purge_stats.write_batch += write_timer.as_us();
purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us();

Ok(columns_purged)
}

fn purge_range(
&self,
write_batch: &mut WriteBatch,
from_slot: Slot,
to_slot: Slot,
purge_type: PurgeType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it's cleaner encapsulation to pass a should_purge_special_columns boolean here, and then have a function PurgeType::should_should_purge_special_columns() -> bool method on PurgeType. This way purge_slot_cleanup_chaining() doesn't have to know about PurgeType at all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way purge_slot_cleanup_chaining() doesn't have to know about PurgeType at all

I see the merit of adding the new function purge_slot_cleanup_chaining() as this has the unique functionality from run_purge_with_stats. However, I'm not sure I see the benefit of the boolean over the enum; can you elaborate?

The enum is part of the public API, so I think it is reasonable to expect someone to know about it. And the Exact value of the enum means "go purge the special columns for these slots right now", which is what the boolean would convey.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It seems like the PurgeType was designed specifically for the run_purge_with_stats workflow, so it seemed better to keep it isolated there rather than mixing it with this new clean function
  2. purge_slot_cleanup_chaining as a utility function seems like it should just decide which columns to clean. Seems like cramming the PurgeType in there was too high level.

) -> Result<bool> {
let columns_purged = self
.db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::SlotMeta>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BankHash>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::BankHash>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Root>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::Root>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredData>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::ShredData>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ShredCode>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::ShredCode>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DeadSlots>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::DeadSlots>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::DuplicateSlots>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::DuplicateSlots>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::ErasureMeta>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::ErasureMeta>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Orphans>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::Orphans>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Index>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::Index>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Rewards>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::Rewards>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Blocktime>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::Blocktime>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::PerfSamples>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::PerfSamples>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BlockHeight>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::BlockHeight>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::OptimisticSlots>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::OptimisticSlots>(write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, from_slot, to_slot)
.delete_range_cf::<cf::MerkleRootMeta>(write_batch, from_slot, to_slot)
.is_ok();

match purge_type {
PurgeType::Exact => {
self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?;
self.purge_special_columns_exact(write_batch, from_slot, to_slot)?;
}
PurgeType::CompactionFilter => {
// No explicit action is required here because this purge type completely and
Expand All @@ -237,39 +329,6 @@ impl Blockstore {
// in no spiky periodic huge delete_range for them.
}
}
delete_range_timer.stop();

let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) {
error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...",
e, from_slot
);
return Err(e);
}
write_timer.stop();

let mut purge_files_in_range_timer = Measure::start("delete_file_in_range");
// purge_files_in_range delete any files whose slot range is within
// [from_slot, to_slot]. When from_slot is 0, it is safe to run
// purge_files_in_range because if purge_files_in_range deletes any
// sst file that contains any range-deletion tombstone, the deletion
// range of that tombstone will be completely covered by the new
// range-delete tombstone (0, to_slot) issued above.
//
// On the other hand, purge_files_in_range is more effective and
// efficient than the compaction filter (which runs key-by-key)
// because all the sst files that have key range below to_slot
// can be deleted immediately.
if columns_purged && from_slot == 0 {
self.purge_files_in_range(from_slot, to_slot);
}
purge_files_in_range_timer.stop();

purge_stats.delete_range += delete_range_timer.as_us();
purge_stats.write_batch += write_timer.as_us();
purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us();

Ok(columns_purged)
}

Expand Down Expand Up @@ -1103,4 +1162,51 @@ pub mod tests {
}
assert_eq!(count, 1);
}

#[test]
fn test_purge_slot_cleanup_chaining_missing_slot_meta() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let (shreds, _) = make_many_slot_entries(0, 10, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();

assert!(matches!(
blockstore.purge_slot_cleanup_chaining(11).unwrap_err(),
BlockstoreError::SlotUnavailable
));
}

#[test]
fn test_purge_slot_cleanup_chaining() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let (shreds, _) = make_many_slot_entries(0, 10, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (slot_11, _) = make_slot_entries(11, 4, 5, true);
blockstore.insert_shreds(slot_11, None, false).unwrap();
let (slot_12, _) = make_slot_entries(12, 5, 5, true);
blockstore.insert_shreds(slot_12, None, false).unwrap();

blockstore.purge_slot_cleanup_chaining(5).unwrap();

let slot_meta = blockstore.meta(5).unwrap().unwrap();
let expected_slot_meta = SlotMeta {
slot: 5,
// Only the next_slots should be preserved
next_slots: vec![6, 12],
..SlotMeta::default()
};
assert_eq!(slot_meta, expected_slot_meta);

let parent_slot_meta = blockstore.meta(4).unwrap().unwrap();
assert_eq!(parent_slot_meta.next_slots, vec![11]);

let child_slot_meta = blockstore.meta(6).unwrap().unwrap();
assert_eq!(child_slot_meta.parent_slot.unwrap(), 5);

let child_slot_meta = blockstore.meta(12).unwrap().unwrap();
assert_eq!(child_slot_meta.parent_slot.unwrap(), 5);
}
}
Loading