Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockstore: write only dirty erasure meta and merkle root metas #34269

Merged
merged 4 commits into from
Dec 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 84 additions & 21 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,26 @@ impl PossibleDuplicateShred {
}
}

enum WorkingEntry<T> {
Dirty(T), // Value has been modified with respect to the blockstore column
Clean(T), // Value matches what is currently in the blockstore column
}

impl<T> WorkingEntry<T> {
fn should_write(&self) -> bool {
matches!(self, Self::Dirty(_))
}
}

impl<T> AsRef<T> for WorkingEntry<T> {
fn as_ref(&self) -> &T {
match self {
Self::Dirty(value) => value,
Self::Clean(value) => value,
}
}
}

pub struct InsertResults {
completed_data_set_infos: Vec<CompletedDataSetInfo>,
duplicate_shreds: Vec<PossibleDuplicateShred>,
Expand Down Expand Up @@ -732,7 +752,7 @@ impl Blockstore {

fn try_shred_recovery(
&self,
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
erasure_metas: &HashMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
reed_solomon_cache: &ReedSolomonCache,
Expand All @@ -743,7 +763,8 @@ impl Blockstore {
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
// 3. Before trying recovery, check if enough number of shreds have been received
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
for (erasure_set, erasure_meta) in erasure_metas.iter() {
for (erasure_set, working_erasure_meta) in erasure_metas.iter() {
let erasure_meta = working_erasure_meta.as_ref();
let slot = erasure_set.slot();
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
Expand Down Expand Up @@ -1018,13 +1039,27 @@ impl Blockstore {
&mut write_batch,
)?;

for (erasure_set, erasure_meta) in erasure_metas {
for (erasure_set, working_erasure_meta) in erasure_metas {
if !working_erasure_meta.should_write() {
// No need to rewrite the column
continue;
}
let (slot, fec_set_index) = erasure_set.store_key();
write_batch.put::<cf::ErasureMeta>((slot, u64::from(fec_set_index)), &erasure_meta)?;
write_batch.put::<cf::ErasureMeta>(
(slot, u64::from(fec_set_index)),
working_erasure_meta.as_ref(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just write & working_erasure_meta.
& will do the .as_ref().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm it doesn't seem to work it only recognizes it as a &WorkingEntry<ErasureMeta>

)?;
}

for (erasure_set, merkle_root_meta) in merkle_root_metas {
write_batch.put::<cf::MerkleRootMeta>(erasure_set.store_key(), &merkle_root_meta)?;
for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
if !working_merkle_root_meta.should_write() {
// No need to rewrite the column
continue;
}
write_batch.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)?;
}

for (&slot, index_working_set_entry) in index_working_set.iter() {
Expand Down Expand Up @@ -1183,8 +1218,8 @@ impl Blockstore {
fn check_insert_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
merkle_root_metas: &mut HashMap<ErasureSetId, MerkleRootMeta>,
erasure_metas: &mut HashMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
merkle_root_metas: &mut HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
Expand All @@ -1205,7 +1240,7 @@ impl Blockstore {

if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) {
if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() {
entry.insert(meta);
entry.insert(WorkingEntry::Clean(meta));
}
}

Expand All @@ -1224,11 +1259,15 @@ impl Blockstore {
}
}

let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| {
let erasure_meta_entry = erasure_metas.entry(erasure_set).or_insert_with(|| {
self.erasure_meta(erasure_set)
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
.map(WorkingEntry::Clean)
.unwrap_or_else(|| {
WorkingEntry::Dirty(ErasureMeta::from_coding_shred(&shred).unwrap())
})
});
let erasure_meta = erasure_meta_entry.as_ref();

if !erasure_meta.check_coding_shred(&shred) {
metrics.num_coding_shreds_invalid_erasure_config += 1;
Expand Down Expand Up @@ -1289,7 +1328,7 @@ impl Blockstore {

merkle_root_metas
.entry(erasure_set)
.or_insert(MerkleRootMeta::from_shred(&shred));
.or_insert(WorkingEntry::Dirty(MerkleRootMeta::from_shred(&shred)));
}

if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) {
Expand Down Expand Up @@ -1372,8 +1411,8 @@ impl Blockstore {
fn check_insert_data_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
merkle_root_metas: &mut HashMap<ErasureSetId, MerkleRootMeta>,
erasure_metas: &mut HashMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
merkle_root_metas: &mut HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
Expand Down Expand Up @@ -1402,7 +1441,7 @@ impl Blockstore {
let erasure_set = shred.erasure_set();
if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) {
if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() {
entry.insert(meta);
entry.insert(WorkingEntry::Clean(meta));
}
}

Expand Down Expand Up @@ -1448,13 +1487,13 @@ impl Blockstore {
)?;
merkle_root_metas
.entry(erasure_set)
.or_insert(MerkleRootMeta::from_shred(&shred));
.or_insert(WorkingEntry::Dirty(MerkleRootMeta::from_shred(&shred)));
just_inserted_shreds.insert(shred.id(), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if let HashMapEntry::Vacant(entry) = erasure_metas.entry(erasure_set) {
if let Some(meta) = self.erasure_meta(erasure_set).unwrap() {
entry.insert(meta);
entry.insert(WorkingEntry::Clean(meta));
}
}
Ok(newly_completed_data_sets)
Expand Down Expand Up @@ -6806,27 +6845,33 @@ pub mod tests {
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
coding_shred.merkle_root().ok(),
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_type(),
ShredType::Code,
);

for (erasure_set, merkle_root_meta) in merkle_root_metas {
for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(erasure_set.store_key(), &merkle_root_meta)
.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
Expand Down Expand Up @@ -6862,13 +6907,15 @@ pub mod tests {
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
Expand Down Expand Up @@ -6918,27 +6965,31 @@ pub mod tests {
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&new_coding_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
new_coding_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&new_coding_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
new_index
);
Expand Down Expand Up @@ -6986,27 +7037,33 @@ pub mod tests {
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_type(),
ShredType::Data,
);

for (erasure_set, merkle_root_meta) in merkle_root_metas {
for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(erasure_set.store_key(), &merkle_root_meta)
.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
Expand Down Expand Up @@ -7046,13 +7103,15 @@ pub mod tests {
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
Expand Down Expand Up @@ -7112,27 +7171,31 @@ pub mod tests {
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
index
);
assert_eq!(
merkle_root_metas
.get(&new_data_shred.erasure_set())
.unwrap()
.as_ref()
.merkle_root(),
new_data_shred.merkle_root().ok()
);
assert_eq!(
merkle_root_metas
.get(&new_data_shred.erasure_set())
.unwrap()
.as_ref()
.first_received_shred_index(),
new_index
);
Expand Down