Skip to content

Commit

Permalink
Remove dated Blockstore PurgeType::PrimaryIndex code (solana-labs#33631)
Browse files Browse the repository at this point in the history
* Update instances of PurgeType::PrimaryIndex to PurgeType::Exact
* Remove now unused functions
* Remove unused active_transaction_status_index field
  • Loading branch information
steviez authored Oct 10, 2023
1 parent 73a9a14 commit 33e1dd7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 128 deletions.
89 changes: 10 additions & 79 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ pub struct Blockstore {
address_signatures_cf: LedgerColumn<cf::AddressSignatures>,
transaction_memos_cf: LedgerColumn<cf::TransactionMemos>,
transaction_status_index_cf: LedgerColumn<cf::TransactionStatusIndex>,
active_transaction_status_index: RwLock<u64>,
rewards_cf: LedgerColumn<cf::Rewards>,
blocktime_cf: LedgerColumn<cf::Blocktime>,
perf_samples_cf: LedgerColumn<cf::PerfSamples>,
Expand Down Expand Up @@ -326,21 +325,11 @@ impl Blockstore {
.unwrap_or(0);
let last_root = RwLock::new(max_root);

// Get active transaction-status index or 0
let active_transaction_status_index = db
// Initialize transaction status index if entries are not present
let initialize_transaction_status_index = db
.iter::<cf::TransactionStatusIndex>(IteratorMode::Start)?
.next();
let initialize_transaction_status_index = active_transaction_status_index.is_none();
let active_transaction_status_index = active_transaction_status_index
.and_then(|(_, data)| {
let index0: TransactionStatusIndexMeta = deserialize(&data).unwrap();
if index0.frozen {
Some(1)
} else {
None
}
})
.unwrap_or(0);
.next()
.is_none();

measure.stop();
info!("{:?} {}", blockstore_path, measure);
Expand All @@ -360,7 +349,6 @@ impl Blockstore {
address_signatures_cf,
transaction_memos_cf,
transaction_status_index_cf,
active_transaction_status_index: RwLock::new(active_transaction_status_index),
rewards_cf,
blocktime_cf,
perf_samples_cf,
Expand Down Expand Up @@ -1125,7 +1113,7 @@ impl Blockstore {
.expect("Couldn't fetch from SlotMeta column family")
{
// Clear all slot related information
self.run_purge(slot, slot, PurgeType::PrimaryIndex)
self.run_purge(slot, slot, PurgeType::Exact)
.expect("Purge database operations failed");

// Clear this slot as a next slot from parent
Expand Down Expand Up @@ -2155,61 +2143,6 @@ impl Blockstore {
Ok(())
}

/// Toggles the active primary index between `0` and `1`, and clears the
/// stored max-slot of the frozen index in preparation for pruning.
fn toggle_transaction_status_index(
&self,
batch: &mut WriteBatch,
w_active_transaction_status_index: &mut u64,
to_slot: Slot,
) -> Result<Option<u64>> {
let index0 = self.transaction_status_index_cf.get(0)?;
if index0.is_none() {
return Ok(None);
}
let mut index0 = index0.unwrap();
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap();

if !index0.frozen && !index1.frozen {
index0.frozen = true;
*w_active_transaction_status_index = 1;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
Ok(None)
} else {
let purge_target_primary_index = if index0.frozen && to_slot > index0.max_slot {
info!(
"Pruning expired primary index 0 up to slot {} (max requested: {})",
index0.max_slot, to_slot
);
Some(0)
} else if index1.frozen && to_slot > index1.max_slot {
info!(
"Pruning expired primary index 1 up to slot {} (max requested: {})",
index1.max_slot, to_slot
);
Some(1)
} else {
None
};

if let Some(purge_target_primary_index) = purge_target_primary_index {
*w_active_transaction_status_index = purge_target_primary_index;
if index0.frozen {
index0.max_slot = 0
};
index0.frozen = !index0.frozen;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
if index1.frozen {
index1.max_slot = 0
};
index1.frozen = !index1.frozen;
batch.put::<cf::TransactionStatusIndex>(1, &index1)?;
}

Ok(purge_target_primary_index)
}
}

fn read_deprecated_transaction_status(
&self,
index: (Signature, Slot),
Expand Down Expand Up @@ -4938,7 +4871,7 @@ pub mod tests {

let max_purge_slot = 1;
blockstore
.run_purge(0, max_purge_slot, PurgeType::PrimaryIndex)
.run_purge(0, max_purge_slot, PurgeType::Exact)
.unwrap();
*blockstore.lowest_cleanup_slot.write().unwrap() = max_purge_slot;

Expand Down Expand Up @@ -9012,7 +8945,7 @@ pub mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap();
}
assert_eq!(blockstore.lowest_slot(), 1);
blockstore.run_purge(0, 5, PurgeType::PrimaryIndex).unwrap();
blockstore.run_purge(0, 5, PurgeType::Exact).unwrap();
assert_eq!(blockstore.lowest_slot(), 6);
}

Expand All @@ -9028,12 +8961,10 @@ pub mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap();
assert_eq!(blockstore.highest_slot().unwrap(), Some(slot));
}
blockstore
.run_purge(5, 10, PurgeType::PrimaryIndex)
.unwrap();
blockstore.run_purge(5, 10, PurgeType::Exact).unwrap();
assert_eq!(blockstore.highest_slot().unwrap(), Some(4));

blockstore.run_purge(0, 4, PurgeType::PrimaryIndex).unwrap();
blockstore.run_purge(0, 4, PurgeType::Exact).unwrap();
assert_eq!(blockstore.highest_slot().unwrap(), None);
}

Expand Down Expand Up @@ -9768,7 +9699,7 @@ pub mod tests {

// Cleanup the slot
blockstore
.run_purge(slot, slot, PurgeType::PrimaryIndex)
.run_purge(slot, slot, PurgeType::Exact)
.expect("Purge database operations failed");
assert!(blockstore.meta(slot).unwrap().is_none());

Expand Down
50 changes: 1 addition & 49 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ pub enum PurgeType {
/// A slower but more accurate way to purge slots by also ensuring higher
/// level of consistency between data during the clean up process.
Exact,
/// A faster approximation of `Exact` where the purge process only takes
/// care of the primary index and does not update the associated entries.
PrimaryIndex,
/// The fastest purge mode that relies on the slot-id based TTL
/// compaction filter to do the cleanup.
CompactionFilter,
Expand Down Expand Up @@ -158,7 +155,7 @@ impl Blockstore {
.batch()
.expect("Database Error: Failed to get write batch");
let mut delete_range_timer = Measure::start("delete_range");
let mut columns_purged = self
let columns_purged = self
.db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
.is_ok()
Expand Down Expand Up @@ -218,20 +215,10 @@ impl Blockstore {
.db
.delete_range_cf::<cf::OptimisticSlots>(&mut write_batch, from_slot, to_slot)
.is_ok();
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
match purge_type {
PurgeType::Exact => {
self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?;
}
PurgeType::PrimaryIndex => {
self.purge_special_columns_with_primary_index(
&mut write_batch,
&mut columns_purged,
&mut w_active_transaction_status_index,
to_slot,
)?;
}
PurgeType::CompactionFilter => {
// No explicit action is required here because this purge type completely and
// indefinitely relies on the proper working of compaction filter for those
Expand Down Expand Up @@ -273,10 +260,6 @@ impl Blockstore {
purge_stats.write_batch += write_timer.as_us();
purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us();

// only drop w_active_transaction_status_index after we do db.write(write_batch);
// otherwise, readers might be confused with inconsistent state between
// self.active_transaction_status_index and RockDb's TransactionStatusIndex contents
drop(w_active_transaction_status_index);
Ok(columns_purged)
}

Expand Down Expand Up @@ -458,37 +441,6 @@ impl Blockstore {
}
Ok(())
}

/// Purges special columns (using a non-Slot primary-index) by range. Purge
/// occurs if frozen primary index has a max-slot less than the highest slot
/// being purged.
fn purge_special_columns_with_primary_index(
&self,
write_batch: &mut WriteBatch,
columns_purged: &mut bool,
w_active_transaction_status_index: &mut u64,
to_slot: Slot,
) -> Result<()> {
if let Some(purged_index) = self.toggle_transaction_status_index(
write_batch,
w_active_transaction_status_index,
to_slot + 1,
)? {
*columns_purged &= self
.db
.delete_range_cf::<cf::TransactionStatus>(write_batch, purged_index, purged_index)
.is_ok()
& self
.db
.delete_range_cf::<cf::AddressSignatures>(
write_batch,
purged_index,
purged_index,
)
.is_ok();
}
Ok(())
}
}

#[cfg(test)]
Expand Down

0 comments on commit 33e1dd7

Please sign in to comment.