From fad42ae05c24b90e9f8c397e1d9fcca0f041c085 Mon Sep 17 00:00:00 2001 From: Ben Date: Wed, 1 Feb 2023 14:08:01 +0100 Subject: [PATCH] ledger-db restructuring (#3441) --- .../src/speculative_ledger.rs | 9 +- massa-ledger-exports/src/controller.rs | 10 - massa-ledger-worker/src/ledger.rs | 19 +- massa-ledger-worker/src/ledger_db.rs | 389 +++++++++--------- 4 files changed, 206 insertions(+), 221 deletions(-) diff --git a/massa-execution-worker/src/speculative_ledger.rs b/massa-execution-worker/src/speculative_ledger.rs index 4baf0d46f18..a251f3c050d 100644 --- a/massa-execution-worker/src/speculative_ledger.rs +++ b/massa-execution-worker/src/speculative_ledger.rs @@ -460,9 +460,12 @@ impl SpeculativeLedger { .fetch_active_history_data_entry(addr, key) { HistorySearchResult::Present(_entry) => true, - HistorySearchResult::NoInfo => { - self.final_state.read().ledger.has_data_entry(addr, key) - } + HistorySearchResult::NoInfo => self + .final_state + .read() + .ledger + .get_data_entry(addr, key) + .is_some(), HistorySearchResult::Absent => false, } }) diff --git a/massa-ledger-exports/src/controller.rs b/massa-ledger-exports/src/controller.rs index 016b08f07ed..e25f43e27bb 100644 --- a/massa-ledger-exports/src/controller.rs +++ b/massa-ledger-exports/src/controller.rs @@ -42,16 +42,6 @@ pub trait LedgerController: Send + Sync + Debug { /// A copy of the datastore value, or `None` if the ledger entry or datastore entry was not found fn get_data_entry(&self, addr: &Address, key: &[u8]) -> Option>; - /// Checks for the existence of a datastore entry for a given address. - /// - /// # Arguments - /// * `addr`: target address - /// * `key`: datastore key - /// - /// # Returns - /// true if the datastore entry was found, or false if the ledger entry or datastore entry was not found - fn has_data_entry(&self, addr: &Address, key: &[u8]) -> bool; - /// Get every key of the datastore for a given address. /// /// # Returns diff --git a/massa-ledger-worker/src/ledger.rs b/massa-ledger-worker/src/ledger.rs index fc168b31e1a..b21bb1413c6 100644 --- a/massa-ledger-worker/src/ledger.rs +++ b/massa-ledger-worker/src/ledger.rs @@ -134,29 +134,12 @@ impl LedgerController for FinalLedger { .get_sub_entry(addr, LedgerSubEntry::Datastore(key.to_owned())) } - /// Checks for the existence of a datastore entry for a given address. - /// - /// # Arguments - /// * `addr`: target address - /// * `key`: datastore key - /// - /// # Returns - /// true if the datastore entry was found, or false if the ledger entry or datastore entry was not found - fn has_data_entry(&self, addr: &Address, key: &[u8]) -> bool { - self.sorted_ledger - .get_sub_entry(addr, LedgerSubEntry::Datastore(key.to_owned())) - .is_some() - } - /// Get every key of the datastore for a given address. /// /// # Returns /// A `BTreeSet` of the datastore keys fn get_datastore_keys(&self, addr: &Address) -> Option>> { - match self.entry_exists(addr) { - true => Some(self.sorted_ledger.get_datastore_keys(addr)), - false => None, - } + self.sorted_ledger.get_datastore_keys(addr) } /// Get the current disk ledger hash diff --git a/massa-ledger-worker/src/ledger_db.rs b/massa-ledger-worker/src/ledger_db.rs index 4f01b8f4a61..4493c2c61e2 100644 --- a/massa-ledger-worker/src/ledger_db.rs +++ b/massa-ledger-worker/src/ledger_db.rs @@ -52,6 +52,16 @@ pub enum LedgerSubEntry { Datastore(Vec), } +impl LedgerSubEntry { + fn derive_key(&self, addr: &Address) -> Vec { + match self { + LedgerSubEntry::Balance => balance_key!(addr), + LedgerSubEntry::Bytecode => bytecode_key!(addr), + LedgerSubEntry::Datastore(hash) => data_key!(addr, hash), + } + } +} + /// Disk ledger DB module /// /// Contains a `RocksDB` DB instance @@ -73,29 +83,6 @@ impl Debug for LedgerDB { } } -/// For a given start prefix (inclusive), returns the correct end prefix (non-inclusive). -/// This assumes the key bytes are ordered in lexicographical order. -/// Since key length is not limited, for some case we return `None` because there is -/// no bounded limit (every keys in the series `[]`, `[255]`, `[255, 255]` ...). -fn end_prefix(prefix: &[u8]) -> Option> { - let mut end_range = prefix.to_vec(); - while let Some(0xff) = end_range.last() { - end_range.pop(); - } - if let Some(byte) = end_range.last_mut() { - *byte += 1; - Some(end_range) - } else { - None - } -} - -#[test] -fn test_end_prefix() { - assert_eq!(end_prefix(&[5, 6, 7]), Some(vec![5, 6, 8])); - assert_eq!(end_prefix(&[5, 6, 255]), Some(vec![5, 7])); -} - /// Batch containing write operations to perform on disk and cache for the ledger hash computing pub struct LedgerBatch { // Rocksdb write batch @@ -210,6 +197,164 @@ impl LedgerDB { self.write_batch(batch); } + /// Get the current disk ledger hash + pub fn get_ledger_hash(&self) -> Hash { + let handle = self.db.cf_handle(METADATA_CF).expect(CF_ERROR); + if let Some(ledger_hash_bytes) = self + .db + .get_pinned_cf(handle, LEDGER_HASH_KEY) + .expect(CRUD_ERROR) + .as_deref() + { + Hash::from_bytes(ledger_hash_bytes.try_into().expect(LEDGER_HASH_ERROR)) + } else { + // initial ledger_hash value to avoid matching an option in every XOR operation + // because of a one time case being an empty ledger + // also note that the if you XOR a hash with itself result is LEDGER_HASH_INITIAL_BYTES + Hash::from_bytes(LEDGER_HASH_INITIAL_BYTES) + } + } + + /// Get the given sub-entry of a given address. + /// + /// # Arguments + /// * `addr`: associated address + /// * `ty`: type of the queried sub-entry + /// + /// # Returns + /// An Option of the sub-entry value as bytes + pub fn get_sub_entry(&self, addr: &Address, ty: LedgerSubEntry) -> Option> { + let handle = self.db.cf_handle(LEDGER_CF).expect(CF_ERROR); + self.db + .get_cf(handle, ty.derive_key(addr)) + .expect(CRUD_ERROR) + } + + /// Get every key of the datastore for a given address. + /// + /// # Returns + /// A `BTreeSet` of the datastore keys + pub fn get_datastore_keys(&self, addr: &Address) -> Option>> { + let handle = self.db.cf_handle(LEDGER_CF).expect(CF_ERROR); + + let mut opt = ReadOptions::default(); + opt.set_iterate_range(data_prefix!(addr).clone()..end_prefix(data_prefix!(addr)).unwrap()); + + let mut iter = self + .db + .iterator_cf_opt(handle, opt, IteratorMode::Start) + .flatten() + .map(|(key, _)| key.split_at(ADDRESS_SIZE_BYTES + 1).1.to_vec()) + .peekable(); + + // Return None if empty + // TODO: function should return None if complete entry does not exist + // and Some([]) if it does but datastore is empty + iter.peek()?; + Some(iter.collect()) + } + + /// Get a part of the disk Ledger. + /// Mainly used in the bootstrap process. + /// + /// # Arguments + /// * `last_key`: key where the part retrieving must start + /// + /// # Returns + /// A tuple containing: + /// * The ledger part as bytes + /// * The last taken key (this is an optimization to easily keep a reference to the last key) + pub fn get_ledger_part( + &self, + cursor: StreamingStep>, + ) -> Result<(Vec, StreamingStep>), ModelsError> { + let handle = self.db.cf_handle(LEDGER_CF).expect(CF_ERROR); + let opt = ReadOptions::default(); + let ser = VecU8Serializer::new(); + let key_serializer = KeySerializer::new(); + let mut ledger_part = Vec::new(); + + // Creates an iterator from the next element after the last if defined, otherwise initialize it at the first key of the ledger. + let (db_iterator, mut new_cursor) = match cursor { + StreamingStep::Started => ( + self.db.iterator_cf_opt(handle, opt, IteratorMode::Start), + StreamingStep::Started, + ), + StreamingStep::Ongoing(last_key) => { + let mut iter = self.db.iterator_cf_opt( + handle, + opt, + IteratorMode::From(&last_key, Direction::Forward), + ); + iter.next(); + (iter, StreamingStep::Finished(None)) + } + StreamingStep::Finished(_) => return Ok((ledger_part, cursor)), + }; + + // Iterates over the whole database + for (key, entry) in db_iterator.flatten() { + if (ledger_part.len() as u64) < (self.ledger_part_size_message_bytes) { + key_serializer.serialize(&key.to_vec(), &mut ledger_part)?; + ser.serialize(&entry.to_vec(), &mut ledger_part)?; + new_cursor = StreamingStep::Ongoing(key.to_vec()); + } else { + break; + } + } + Ok((ledger_part, new_cursor)) + } + + /// Set a part of the ledger in the database. + /// We deserialize in this function because we insert in the ledger while deserializing. + /// Used for bootstrap. + /// + /// # Arguments + /// * data: must be the serialized version provided by `get_ledger_part` + /// + /// # Returns + /// The last key of the inserted entry (this is an optimization to easily keep a reference to the last key) + pub fn set_ledger_part<'a>( + &self, + data: &'a [u8], + ) -> Result>, ModelsError> { + let handle = self.db.cf_handle(LEDGER_CF).expect(CF_ERROR); + let vec_u8_deserializer = + VecU8Deserializer::new(Bound::Included(0), Bound::Excluded(u64::MAX)); + let key_deserializer = KeyDeserializer::new(self.max_datastore_key_length); + let mut last_key = Rc::new(Vec::new()); + let mut batch = LedgerBatch::new(self.get_ledger_hash()); + + // Since this data is coming from the network, deser to address and ser back to bytes for a security check. + let (rest, _) = many0(|input: &'a [u8]| { + let (rest, (key, value)) = tuple(( + |input| key_deserializer.deserialize(input), + |input| vec_u8_deserializer.deserialize(input), + ))(input)?; + *Rc::get_mut(&mut last_key).ok_or_else(|| { + nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Fail)) + })? = key.clone(); + self.put_entry_value(handle, &mut batch, &key, &value); + Ok((rest, ())) + })(data) + .map_err(|_| ModelsError::SerializeError("Error in deserialization".to_string()))?; + + // Every byte should have been read + if last_key.is_empty() { + Ok(StreamingStep::Finished(None)) + } else if rest.is_empty() { + self.write_batch(batch); + Ok(StreamingStep::Ongoing((*last_key).clone())) + } else { + Err(ModelsError::SerializeError( + "rest is not empty.".to_string(), + )) + } + } +} + +// Private helpers +impl LedgerDB { /// Apply the given operation batch to the disk ledger fn write_batch(&self, mut batch: LedgerBatch) { let handle = self.db.cf_handle(METADATA_CF).expect(CF_ERROR); @@ -235,26 +380,12 @@ impl LedgerDB { .write_batch .put_cf(handle, SLOT_KEY, slot_bytes.clone()); // XOR previous slot and new one - if let Some(prev_bytes) = self.db.get_cf(handle, SLOT_KEY).expect(CRUD_ERROR) { + if let Some(prev_bytes) = self.db.get_pinned_cf(handle, SLOT_KEY).expect(CRUD_ERROR) { batch.ledger_hash ^= Hash::compute_from(&prev_bytes); } batch.ledger_hash ^= Hash::compute_from(&slot_bytes); } - /// Get the current disk ledger hash - pub fn get_ledger_hash(&self) -> Hash { - let handle = self.db.cf_handle(METADATA_CF).expect(CF_ERROR); - if let Some(ledger_hash_bytes) = self.db.get_cf(handle, LEDGER_HASH_KEY).expect(CRUD_ERROR) - { - Hash::from_bytes(&ledger_hash_bytes.try_into().expect(LEDGER_HASH_ERROR)) - } else { - // initial ledger_hash value to avoid matching an option in every XOR operation - // because of a one time case being an empty ledger - // also note that the if you XOR a hash with itself result is LEDGER_HASH_INITIAL_BYTES - Hash::from_bytes(LEDGER_HASH_INITIAL_BYTES) - } - } - /// Internal function to put a key & value and perform the ledger hash XORs fn put_entry_value( &self, @@ -299,54 +430,6 @@ impl LedgerDB { } } - /// Get the given sub-entry of a given address. - /// - /// # Arguments - /// * `addr`: associated address - /// * `ty`: type of the queried sub-entry - /// - /// # Returns - /// An Option of the sub-entry value as bytes - pub fn get_sub_entry(&self, addr: &Address, ty: LedgerSubEntry) -> Option> { - let handle = self.db.cf_handle(LEDGER_CF).expect(CF_ERROR); - - match ty { - LedgerSubEntry::Balance => self - .db - .get_cf(handle, balance_key!(addr)) - .expect(CRUD_ERROR), - LedgerSubEntry::Bytecode => self - .db - .get_cf(handle, bytecode_key!(addr)) - .expect(CRUD_ERROR), - LedgerSubEntry::Datastore(hash) => self - .db - .get_cf(handle, data_key!(addr, hash)) - .expect(CRUD_ERROR), - } - } - - /// Get every key of the datastore for a given address. - /// - /// # Returns - /// A `BTreeSet` of the datastore keys - pub fn get_datastore_keys(&self, addr: &Address) -> BTreeSet> { - let handle = self.db.cf_handle(LEDGER_CF).expect(CF_ERROR); - - let mut opt = ReadOptions::default(); - opt.set_iterate_upper_bound(end_prefix(data_prefix!(addr)).unwrap()); - - self.db - .iterator_cf_opt( - handle, - opt, - IteratorMode::From(data_prefix!(addr), Direction::Forward), - ) - .flatten() - .map(|(key, _)| key.split_at(ADDRESS_SIZE_BYTES + 1).1.to_vec()) - .collect() - } - /// Internal function to update a key & value and perform the ledger hash XORs fn update_key_value( &self, @@ -361,7 +444,7 @@ impl LedgerDB { .expect(KEY_LEN_SER_ERROR); if let Some(added_hash) = batch.aeh_list.get(key) { batch.ledger_hash ^= *added_hash; - } else if let Some(prev_bytes) = self.db.get_cf(handle, key).expect(CRUD_ERROR) { + } else if let Some(prev_bytes) = self.db.get_pinned_cf(handle, key).expect(CRUD_ERROR) { batch.ledger_hash ^= Hash::compute_from(&[&len_bytes, key, &prev_bytes].concat()); } let hash = Hash::compute_from(&[&len_bytes, key, value].concat()); @@ -413,7 +496,7 @@ impl LedgerDB { fn delete_key(&self, handle: &ColumnFamily, batch: &mut LedgerBatch, key: &[u8]) { if let Some(added_hash) = batch.aeh_list.get(key) { batch.ledger_hash ^= *added_hash; - } else if let Some(prev_bytes) = self.db.get_cf(handle, key).expect(CRUD_ERROR) { + } else if let Some(prev_bytes) = self.db.get_pinned_cf(handle, key).expect(CRUD_ERROR) { let mut len_bytes = Vec::new(); self.len_serializer .serialize(&(key.len() as u64), &mut len_bytes) @@ -451,112 +534,16 @@ impl LedgerDB { self.delete_key(handle, batch, &key); } } - - /// Get a part of the disk Ledger. - /// Mainly used in the bootstrap process. - /// - /// # Arguments - /// * `last_key`: key where the part retrieving must start - /// - /// # Returns - /// A tuple containing: - /// * The ledger part as bytes - /// * The last taken key (this is an optimization to easily keep a reference to the last key) - pub fn get_ledger_part( - &self, - cursor: StreamingStep>, - ) -> Result<(Vec, StreamingStep>), ModelsError> { - let handle = self.db.cf_handle(LEDGER_CF).expect(CF_ERROR); - let opt = ReadOptions::default(); - let ser = VecU8Serializer::new(); - let key_serializer = KeySerializer::new(); - let mut ledger_part = Vec::new(); - - // Creates an iterator from the next element after the last if defined, otherwise initialize it at the first key of the ledger. - let (db_iterator, mut new_cursor) = match cursor { - StreamingStep::Started => ( - self.db.iterator_cf_opt(handle, opt, IteratorMode::Start), - StreamingStep::Started, - ), - StreamingStep::Ongoing(last_key) => { - let mut iter = self.db.iterator_cf_opt( - handle, - opt, - IteratorMode::From(&last_key, Direction::Forward), - ); - iter.next(); - (iter, StreamingStep::Finished(None)) - } - StreamingStep::Finished(_) => return Ok((ledger_part, cursor)), - }; - - // Iterates over the whole database - for (key, entry) in db_iterator.flatten() { - if (ledger_part.len() as u64) < (self.ledger_part_size_message_bytes) { - key_serializer.serialize(&key.to_vec(), &mut ledger_part)?; - ser.serialize(&entry.to_vec(), &mut ledger_part)?; - new_cursor = StreamingStep::Ongoing(key.to_vec()); - } else { - break; - } - } - Ok((ledger_part, new_cursor)) - } - - /// Set a part of the ledger in the database. - /// We deserialize in this function because we insert in the ledger while deserializing. - /// Used for bootstrap. - /// - /// # Arguments - /// * data: must be the serialized version provided by `get_ledger_part` - /// - /// # Returns - /// The last key of the inserted entry (this is an optimization to easily keep a reference to the last key) - pub fn set_ledger_part<'a>( - &self, - data: &'a [u8], - ) -> Result>, ModelsError> { - let handle = self.db.cf_handle(LEDGER_CF).expect(CF_ERROR); - let vec_u8_deserializer = - VecU8Deserializer::new(Bound::Included(0), Bound::Excluded(u64::MAX)); - let key_deserializer = KeyDeserializer::new(self.max_datastore_key_length); - let mut last_key = Rc::new(Vec::new()); - let mut batch = LedgerBatch::new(self.get_ledger_hash()); - - // Since this data is coming from the network, deser to address and ser back to bytes for a security check. - let (rest, _) = many0(|input: &'a [u8]| { - let (rest, (key, value)) = tuple(( - |input| key_deserializer.deserialize(input), - |input| vec_u8_deserializer.deserialize(input), - ))(input)?; - *Rc::get_mut(&mut last_key).ok_or_else(|| { - nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Fail)) - })? = key.clone(); - self.put_entry_value(handle, &mut batch, &key, &value); - Ok((rest, ())) - })(data) - .map_err(|_| ModelsError::SerializeError("Error in deserialization".to_string()))?; - - // Every byte should have been read - if last_key.is_empty() { - Ok(StreamingStep::Finished(None)) - } else if rest.is_empty() { - self.write_batch(batch); - Ok(StreamingStep::Ongoing((*last_key).clone())) - } else { - Err(ModelsError::SerializeError( - "rest is not empty.".to_string(), - )) - } - } - +} +// test helpers +impl LedgerDB { /// Get every address and their corresponding balance. /// /// IMPORTANT: This should only be used for debug purposes. /// /// # Returns /// A `BTreeMap` with the address as key and the balance as value - #[cfg(feature = "testing")] + #[cfg(any(feature = "testing"))] pub fn get_every_address( &self, ) -> std::collections::BTreeMap { @@ -620,10 +607,26 @@ impl LedgerDB { } } +/// For a given start prefix (inclusive), returns the correct end prefix (non-inclusive). +/// This assumes the key bytes are ordered in lexicographical order. +/// Since key length is not limited, for some case we return `None` because there is +/// no bounded limit (every keys in the series `[]`, `[255]`, `[255, 255]` ...). +fn end_prefix(prefix: &[u8]) -> Option> { + let mut end_range = prefix.to_vec(); + while let Some(0xff) = end_range.last() { + end_range.pop(); + } + if let Some(byte) = end_range.last_mut() { + *byte += 1; + Some(end_range) + } else { + None + } +} + #[cfg(test)] mod tests { - use super::LedgerDB; - use crate::ledger_db::{LedgerBatch, LedgerSubEntry, LEDGER_HASH_INITIAL_BYTES}; + use super::*; use massa_hash::Hash; use massa_ledger_exports::{LedgerEntry, LedgerEntryUpdate, SetOrKeep}; use massa_models::{ @@ -716,4 +719,10 @@ mod tests { let res = db.get_ledger_part(StreamingStep::Started).unwrap(); db.set_ledger_part(&res.0[..]).unwrap(); } + + #[test] + fn test_end_prefix() { + assert_eq!(end_prefix(&[5, 6, 7]), Some(vec![5, 6, 8])); + assert_eq!(end_prefix(&[5, 6, 255]), Some(vec![5, 7])); + } }