Skip to content

Commit

Permalink
add merkle root meta column to blockstore (#33979)
Browse files Browse the repository at this point in the history
* add merkle root meta column to blockstore

* pr feedback: remove write/reads to column

* pr feedback: u64 -> u32 + revert

* pr feedback: fec_set_index u32, use Self::Index

* pr feedback: key size 16 -> 12

(cherry picked from commit e457c02)
  • Loading branch information
AshwinSekar committed Dec 1, 2023
1 parent fca44b7 commit a3c02e5
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 0 deletions.
4 changes: 4 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ pub struct Blockstore {
program_costs_cf: LedgerColumn<cf::ProgramCosts>,
bank_hash_cf: LedgerColumn<cf::BankHash>,
optimistic_slots_cf: LedgerColumn<cf::OptimisticSlots>,
merkle_root_meta_cf: LedgerColumn<cf::MerkleRootMeta>,
last_root: RwLock<Slot>,
insert_shreds_lock: Mutex<()>,
new_shreds_signals: Mutex<Vec<Sender<bool>>>,
Expand Down Expand Up @@ -312,6 +313,7 @@ impl Blockstore {
let program_costs_cf = db.column();
let bank_hash_cf = db.column();
let optimistic_slots_cf = db.column();
let merkle_root_meta_cf = db.column();

let db = Arc::new(db);

Expand Down Expand Up @@ -365,6 +367,7 @@ impl Blockstore {
program_costs_cf,
bank_hash_cf,
optimistic_slots_cf,
merkle_root_meta_cf,
new_shreds_signals: Mutex::default(),
completed_slots_senders: Mutex::default(),
shred_timing_point_sender: None,
Expand Down Expand Up @@ -734,6 +737,7 @@ impl Blockstore {
self.program_costs_cf.submit_rocksdb_cf_metrics();
self.bank_hash_cf.submit_rocksdb_cf_metrics();
self.optimistic_slots_cf.submit_rocksdb_cf_metrics();
self.merkle_root_meta_cf.submit_rocksdb_cf_metrics();
}

fn try_shred_recovery(
Expand Down
8 changes: 8 additions & 0 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ impl Blockstore {
& self
.db
.delete_range_cf::<cf::OptimisticSlots>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, from_slot, to_slot)
.is_ok();
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
Expand Down Expand Up @@ -337,6 +341,10 @@ impl Blockstore {
.db
.delete_file_in_range_cf::<cf::OptimisticSlots>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::MerkleRootMeta>(from_slot, to_slot)
.is_ok()
}

/// Purges special columns (using a non-Slot primary-index) exactly, by
Expand Down
51 changes: 51 additions & 0 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub use rocksdb::Direction as IteratorDirection;
use {
crate::{
blockstore_meta,
blockstore_meta::MerkleRootMeta,
blockstore_metrics::{
maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf,
BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET,
Expand Down Expand Up @@ -103,6 +104,8 @@ const BLOCK_HEIGHT_CF: &str = "block_height";
const PROGRAM_COSTS_CF: &str = "program_costs";
/// Column family for optimistic slots
const OPTIMISTIC_SLOTS_CF: &str = "optimistic_slots";
/// Column family for merkle roots
const MERKLE_ROOT_META_CF: &str = "merkle_root_meta";

#[derive(Error, Debug)]
pub enum BlockstoreError {
Expand Down Expand Up @@ -323,6 +326,19 @@ pub mod columns {
/// * value type: [`blockstore_meta::OptimisticSlotMetaVersioned`]
pub struct OptimisticSlots;

#[derive(Debug)]
/// The merkle root meta column
///
/// Each merkle shred is part of a merkle tree for
/// its FEC set. This column stores that merkle root and associated
/// meta information about the first shred received.
///
/// Its index type is (Slot, fec_set_index).
///
/// * index type: `crate::shred::ErasureSetId` `(Slot, fec_set_index: u32)`
/// * value type: [`blockstore_meta::MerkleRootMeta`]`
pub struct MerkleRootMeta;

// When adding a new column ...
// - Add struct below and implement `Column` and `ColumnName` traits
// - Add descriptor in Rocks::cf_descriptors() and name in Rocks::columns()
Expand Down Expand Up @@ -454,6 +470,7 @@ impl Rocks {
new_cf_descriptor::<BlockHeight>(options, oldest_slot),
new_cf_descriptor::<ProgramCosts>(options, oldest_slot),
new_cf_descriptor::<OptimisticSlots>(options, oldest_slot),
new_cf_descriptor::<MerkleRootMeta>(options, oldest_slot),
];

// If the access type is Secondary, we don't need to open all of the
Expand Down Expand Up @@ -526,6 +543,7 @@ impl Rocks {
BlockHeight::NAME,
ProgramCosts::NAME,
OptimisticSlots::NAME,
MerkleRootMeta::NAME,
]
}

Expand Down Expand Up @@ -1157,6 +1175,39 @@ impl TypedColumn for columns::OptimisticSlots {
type Type = blockstore_meta::OptimisticSlotMetaVersioned;
}

impl Column for columns::MerkleRootMeta {
type Index = (Slot, /*fec_set_index:*/ u32);

fn index(key: &[u8]) -> Self::Index {
let slot = BigEndian::read_u64(&key[..8]);
let fec_set_index = BigEndian::read_u32(&key[8..]);

(slot, fec_set_index)
}

fn key((slot, fec_set_index): Self::Index) -> Vec<u8> {
let mut key = vec![0; 12];
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u32(&mut key[8..], fec_set_index);
key
}

fn primary_index((slot, _fec_set_index): Self::Index) -> Slot {
slot
}

fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}

impl ColumnName for columns::MerkleRootMeta {
const NAME: &'static str = MERKLE_ROOT_META_CF;
}
impl TypedColumn for columns::MerkleRootMeta {
type Type = MerkleRootMeta;
}

#[derive(Debug)]
pub struct Database {
backend: Arc<Rocks>,
Expand Down
10 changes: 10 additions & 0 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ pub(crate) struct ErasureConfig {
num_coding: usize,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct MerkleRootMeta {
/// The merkle root
merkle_root: Hash,
/// The first received shred index
first_received_shred_index: u32,
/// The shred type of the first received shred
first_received_shred_type: ShredType,
}

#[derive(Deserialize, Serialize)]
pub struct DuplicateSlotProof {
#[serde(with = "serde_bytes")]
Expand Down

0 comments on commit a3c02e5

Please sign in to comment.