Skip to content

Commit

Permalink
blockstore: write only dirty erasure meta and merkle root metas (sola…
Browse files Browse the repository at this point in the history
…na-labs#34269)

* blockstore: write only dirty erasure meta and merkle root metas

* pr feedback: use enum to distinguish clean/dirty

* pr feedback: comments, rename

* pr feedback: use AsRef
  • Loading branch information
AshwinSekar authored Dec 22, 2023
1 parent 368852d commit cc584a0
Showing 1 changed file with 84 additions and 21 deletions.
105 changes: 84 additions & 21 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,26 @@ impl PossibleDuplicateShred {
}
}

enum WorkingEntry<T> {
Dirty(T), // Value has been modified with respect to the blockstore column
Clean(T), // Value matches what is currently in the blockstore column
}

impl<T> WorkingEntry<T> {
fn should_write(&self) -> bool {
matches!(self, Self::Dirty(_))
}
}

impl<T> AsRef<T> for WorkingEntry<T> {
fn as_ref(&self) -> &T {
match self {
Self::Dirty(value) => value,
Self::Clean(value) => value,
}
}
}

pub struct InsertResults {
completed_data_set_infos: Vec<CompletedDataSetInfo>,
duplicate_shreds: Vec<PossibleDuplicateShred>,
Expand Down Expand Up @@ -732,7 +752,7 @@ impl Blockstore {

fn try_shred_recovery(
&self,
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
erasure_metas: &HashMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
reed_solomon_cache: &ReedSolomonCache,
Expand All @@ -743,7 +763,8 @@ impl Blockstore {
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
// 3. Before trying recovery, check if enough number of shreds have been received
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
for (erasure_set, erasure_meta) in erasure_metas.iter() {
for (erasure_set, working_erasure_meta) in erasure_metas.iter() {
let erasure_meta = working_erasure_meta.as_ref();
let slot = erasure_set.slot();
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
Expand Down Expand Up @@ -1018,13 +1039,27 @@ impl Blockstore {
&mut write_batch,
)?;

for (erasure_set, erasure_meta) in erasure_metas {
for (erasure_set, working_erasure_meta) in erasure_metas {
if !working_erasure_meta.should_write() {
// No need to rewrite the column
continue;
}
let (slot, fec_set_index) = erasure_set.store_key();
write_batch.put::<cf::ErasureMeta>((slot, u64::from(fec_set_index)), &erasure_meta)?;
write_batch.put::<cf::ErasureMeta>(
(slot, u64::from(fec_set_index)),
working_erasure_meta.as_ref(),
)?;
}

for (erasure_set, merkle_root_meta) in merkle_root_metas {
write_batch.put::<cf::MerkleRootMeta>(erasure_set.store_key(), &merkle_root_meta)?;
for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
if !working_merkle_root_meta.should_write() {
// No need to rewrite the column
continue;
}
write_batch.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)?;
}

for (&slot, index_working_set_entry) in index_working_set.iter() {
Expand Down Expand Up @@ -1183,8 +1218,8 @@ impl Blockstore {
fn check_insert_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
merkle_root_metas: &mut HashMap<ErasureSetId, MerkleRootMeta>,
erasure_metas: &mut HashMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
merkle_root_metas: &mut HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
Expand All @@ -1205,7 +1240,7 @@ impl Blockstore {

if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) {
if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() {
entry.insert(meta);
entry.insert(WorkingEntry::Clean(meta));
}
}

Expand All @@ -1224,11 +1259,15 @@ impl Blockstore {
}
}

let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| {
let erasure_meta_entry = erasure_metas.entry(erasure_set).or_insert_with(|| {
self.erasure_meta(erasure_set)
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
.map(WorkingEntry::Clean)
.unwrap_or_else(|| {
WorkingEntry::Dirty(ErasureMeta::from_coding_shred(&shred).unwrap())
})
});
let erasure_meta = erasure_meta_entry.as_ref();

if !erasure_meta.check_coding_shred(&shred) {
metrics.num_coding_shreds_invalid_erasure_config += 1;
Expand Down Expand Up @@ -1289,7 +1328,7 @@ impl Blockstore {

merkle_root_metas
.entry(erasure_set)
.or_insert(MerkleRootMeta::from_shred(&shred));
.or_insert(WorkingEntry::Dirty(MerkleRootMeta::from_shred(&shred)));
}

if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) {
Expand Down Expand Up @@ -1372,8 +1411,8 @@ impl Blockstore {
fn check_insert_data_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
merkle_root_metas: &mut HashMap<ErasureSetId, MerkleRootMeta>,
erasure_metas: &mut HashMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
merkle_root_metas: &mut HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
Expand Down Expand Up @@ -1402,7 +1441,7 @@ impl Blockstore {
let erasure_set = shred.erasure_set();
if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) {
if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() {
entry.insert(meta);
entry.insert(WorkingEntry::Clean(meta));
}
}

Expand Down Expand Up @@ -1448,13 +1487,13 @@ impl Blockstore {
)?;
merkle_root_metas
.entry(erasure_set)
.or_insert(MerkleRootMeta::from_shred(&shred));
.or_insert(WorkingEntry::Dirty(MerkleRootMeta::from_shred(&shred)));
just_inserted_shreds.insert(shred.id(), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if let HashMapEntry::Vacant(entry) = erasure_metas.entry(erasure_set) {
if let Some(meta) = self.erasure_meta(erasure_set).unwrap() {
entry.insert(meta);
entry.insert(WorkingEntry::Clean(meta));
}
}
Ok(newly_completed_data_sets)
Expand Down Expand Up @@ -6806,27 +6845,33 @@ pub mod tests {
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
coding_shred.merkle_root().ok(),
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_type(),
ShredType::Code,
);

for (erasure_set, merkle_root_meta) in merkle_root_metas {
for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(erasure_set.store_key(), &merkle_root_meta)
.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
Expand Down Expand Up @@ -6862,13 +6907,15 @@ pub mod tests {
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
Expand Down Expand Up @@ -6918,27 +6965,31 @@ pub mod tests {
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&new_coding_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
new_coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&new_coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
new_index
);
Expand Down Expand Up @@ -6986,27 +7037,33 @@ pub mod tests {
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_type(),
ShredType::Data,
);

for (erasure_set, merkle_root_meta) in merkle_root_metas {
for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(erasure_set.store_key(), &merkle_root_meta)
.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
Expand Down Expand Up @@ -7046,13 +7103,15 @@ pub mod tests {
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
Expand Down Expand Up @@ -7112,27 +7171,31 @@ pub mod tests {
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&new_data_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
new_data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&new_data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
new_index
);
Expand Down

0 comments on commit cc584a0

Please sign in to comment.