Skip to content

Commit

Permalink
blockstore: write only dirty erasure meta and merkle root metas
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Nov 29, 2023
1 parent e1165aa commit 4f95a50
Showing 1 changed file with 51 additions and 18 deletions.
69 changes: 51 additions & 18 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ impl Blockstore {

fn try_shred_recovery(
&self,
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
erasure_metas: &HashMap<ErasureSetId, (ErasureMeta, bool)>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
reed_solomon_cache: &ReedSolomonCache,
Expand All @@ -742,7 +742,7 @@ impl Blockstore {
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
// 3. Before trying recovery, check if enough number of shreds have been received
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
for (erasure_set, erasure_meta) in erasure_metas.iter() {
for (erasure_set, (erasure_meta, _)) in erasure_metas.iter() {
let slot = erasure_set.slot();
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
Expand Down Expand Up @@ -1017,11 +1017,20 @@ impl Blockstore {
&mut write_batch,
)?;

for (erasure_set, erasure_meta) in erasure_metas {
write_batch.put::<cf::ErasureMeta>(erasure_set.store_key(), &erasure_meta)?;
for (erasure_set, (erasure_meta, is_new)) in erasure_metas {
if !is_new {
// No need to rewrite the column
continue;
}
let (slot, fec_set_index) = erasure_set.store_key();
write_batch.put::<cf::ErasureMeta>((slot, fec_set_index), &erasure_meta)?;
}

for (erasure_set, merkle_root_meta) in merkle_root_metas {
for (erasure_set, (merkle_root_meta, is_new)) in merkle_root_metas {
if !is_new {
// No need to rewrite the column
continue;
}
write_batch.put::<cf::MerkleRootMeta>(erasure_set.key(), &merkle_root_meta)?;
}

Expand Down Expand Up @@ -1181,8 +1190,8 @@ impl Blockstore {
fn check_insert_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
merkle_root_metas: &mut HashMap<ErasureSetId, MerkleRootMeta>,
erasure_metas: &mut HashMap<ErasureSetId, (ErasureMeta, bool)>,
merkle_root_metas: &mut HashMap<ErasureSetId, (MerkleRootMeta, bool)>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
Expand All @@ -1203,7 +1212,7 @@ impl Blockstore {

if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) {
if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() {
entry.insert(meta);
entry.insert((meta, /* is_new */ false));
}
}

Expand All @@ -1222,10 +1231,16 @@ impl Blockstore {
}
}

let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| {
let (erasure_meta, _) = erasure_metas.entry(erasure_set).or_insert_with(|| {
self.erasure_meta(erasure_set)
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
.map(|erasure_meta| (erasure_meta, false /* is_new */))
.unwrap_or_else(|| {
(
ErasureMeta::from_coding_shred(&shred).unwrap(),
true, /* is_new */
)
})
});

if !erasure_meta.check_coding_shred(&shred) {
Expand Down Expand Up @@ -1287,7 +1302,7 @@ impl Blockstore {

merkle_root_metas
.entry(erasure_set)
.or_insert(MerkleRootMeta::from_shred(&shred));
.or_insert((MerkleRootMeta::from_shred(&shred), true /* is _new */));
}

if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) {
Expand Down Expand Up @@ -1370,8 +1385,8 @@ impl Blockstore {
fn check_insert_data_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
merkle_root_metas: &mut HashMap<ErasureSetId, MerkleRootMeta>,
erasure_metas: &mut HashMap<ErasureSetId, (ErasureMeta, bool)>,
merkle_root_metas: &mut HashMap<ErasureSetId, (MerkleRootMeta, bool)>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
Expand Down Expand Up @@ -1400,7 +1415,7 @@ impl Blockstore {
let erasure_set = shred.erasure_set();
if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) {
if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() {
entry.insert(meta);
entry.insert((meta, false /* is_new */));
}
}

Expand Down Expand Up @@ -1446,13 +1461,13 @@ impl Blockstore {
)?;
merkle_root_metas
.entry(erasure_set)
.or_insert(MerkleRootMeta::from_shred(&shred));
.or_insert((MerkleRootMeta::from_shred(&shred), true /* is_new */));
just_inserted_shreds.insert(shred.id(), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if let HashMapEntry::Vacant(entry) = erasure_metas.entry(erasure_set) {
if let Some(meta) = self.erasure_meta(erasure_set).unwrap() {
entry.insert(meta);
entry.insert((meta, false /* is_new */));
}
}
Ok(newly_completed_data_sets)
Expand Down Expand Up @@ -6808,25 +6823,28 @@ pub mod tests {
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.0
.merkle_root(),
coding_shred.merkle_root().ok(),
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.0
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.0
.first_received_shred_type(),
ShredType::Code,
);

for (erasure_set, merkle_root_meta) in merkle_root_metas {
for (erasure_set, (merkle_root_meta, _)) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(erasure_set.key(), &merkle_root_meta)
.unwrap();
Expand Down Expand Up @@ -6864,13 +6882,15 @@ pub mod tests {
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.0
.merkle_root(),
coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.0
.first_received_shred_index(),
index
);
Expand Down Expand Up @@ -6920,27 +6940,31 @@ pub mod tests {
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.0
.merkle_root(),
coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.0
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&new_coding_shred.erasure_set())
.unwrap()
.0
.merkle_root(),
new_coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&new_coding_shred.erasure_set())
.unwrap()
.0
.first_received_shred_index(),
new_index
);
Expand Down Expand Up @@ -6988,25 +7012,28 @@ pub mod tests {
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.0
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.0
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.0
.first_received_shred_type(),
ShredType::Data,
);

for (erasure_set, merkle_root_meta) in merkle_root_metas {
for (erasure_set, (merkle_root_meta, _)) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(erasure_set.key(), &merkle_root_meta)
.unwrap();
Expand Down Expand Up @@ -7048,13 +7075,15 @@ pub mod tests {
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.0
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.0
.first_received_shred_index(),
index
);
Expand Down Expand Up @@ -7114,27 +7143,31 @@ pub mod tests {
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.0
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.0
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&new_data_shred.erasure_set())
.unwrap()
.0
.merkle_root(),
new_data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&new_data_shred.erasure_set())
.unwrap()
.0
.first_received_shred_index(),
new_index
);
Expand Down

0 comments on commit 4f95a50

Please sign in to comment.