Skip to content

Commit

Permalink
feat: introduce cached flat storage deltas (#8662)
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm authored Mar 3, 2023
1 parent 725ddac commit 37793cf
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 31 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ impl FlatStorageShardCreator {
flat_head = chain_store.get_next_block_hash(&flat_head).unwrap();
let delta =
store_helper::get_delta(store, shard_id, flat_head).unwrap().unwrap();
merged_delta.merge(delta.as_ref());
merged_delta.merge(delta);
}

if (old_flat_head != &flat_head) || (flat_head == chain_final_head.last_block_hash)
Expand Down
48 changes: 36 additions & 12 deletions core/store/src/flat/delta.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use borsh::{BorshDeserialize, BorshSerialize};

use near_primitives::hash::hash;
use near_primitives::state::ValueRef;
use near_primitives::types::{RawStateChangesWithTrieKey, ShardId};
use std::collections::HashMap;
Expand All @@ -26,11 +27,6 @@ impl<const N: usize> From<[(Vec<u8>, Option<ValueRef>); N]> for FlatStateDelta {
}

impl FlatStateDelta {
/// Assumed number of bytes used to store an entry in the cache.
///
/// Based on 36 bytes for `ValueRef` + guessed overhead of 24 bytes for `Vec` and `HashMap`.
pub(crate) const PER_ENTRY_OVERHEAD: u64 = 60;

/// Returns `Some(Option<ValueRef>)` from delta for the given key. If key is not present, returns None.
pub fn get(&self, key: &[u8]) -> Option<Option<ValueRef>> {
self.0.get(key).cloned()
Expand All @@ -45,13 +41,9 @@ impl FlatStateDelta {
self.0.len()
}

pub fn total_size(&self) -> u64 {
self.0.keys().map(|key| key.len() as u64 + Self::PER_ENTRY_OVERHEAD).sum()
}

/// Merge two deltas. Values from `other` should override values from `self`.
pub fn merge(&mut self, other: &Self) {
self.0.extend(other.0.iter().map(|(k, v)| (k.clone(), v.clone())))
pub fn merge(&mut self, other: Self) {
self.0.extend(other.0.into_iter())
}

/// Creates delta using raw state changes for some block.
Expand Down Expand Up @@ -92,6 +84,38 @@ impl FlatStateDelta {
pub fn apply_to_flat_state(self, _store_update: &mut StoreUpdate) {}
}

/// `FlatStateDelta` which uses hash of raw `TrieKey`s instead of keys themselves.
/// Used to reduce memory used by deltas and serves read queries.
pub struct CachedFlatStateDelta(HashMap<CryptoHash, Option<ValueRef>>);

impl From<FlatStateDelta> for CachedFlatStateDelta {
fn from(delta: FlatStateDelta) -> Self {
Self(delta.0.into_iter().map(|(key, value)| (hash(&key), value)).collect())
}
}

impl CachedFlatStateDelta {
/// Size of cache entry in bytes.
const ENTRY_SIZE: usize =
std::mem::size_of::<CryptoHash>() + std::mem::size_of::<Option<ValueRef>>();

/// Returns `Some(Option<ValueRef>)` from delta for the given key. If key is not present, returns None.
#[allow(unused)]
pub(crate) fn get(&self, key: &[u8]) -> Option<Option<ValueRef>> {
self.0.get(&hash(key)).cloned()
}

/// Returns number of all entries.
pub(crate) fn len(&self) -> usize {
self.0.len()
}

/// Total size in bytes consumed by delta. May be changed if we implement inlining of `ValueRef`s.
pub(crate) fn total_size(&self) -> u64 {
(self.0.capacity() as u64) * (Self::ENTRY_SIZE as u64)
}
}

#[cfg(feature = "protocol_feature_flat_state")]
#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -193,7 +217,7 @@ mod tests {
(vec![4], None),
(vec![5], Some(ValueRef::new(&[9]))),
]);
delta.merge(&delta_new);
delta.merge(delta_new);

assert_eq!(delta.get(&[1]), Some(Some(ValueRef::new(&[4]))));
assert_eq!(delta.get(&[2]), Some(Some(ValueRef::new(&[7]))));
Expand Down
3 changes: 1 addition & 2 deletions core/store/src/flat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ pub mod store_helper {
use crate::Store;
use near_primitives::hash::CryptoHash;
use near_primitives::types::ShardId;
use std::sync::Arc;

pub fn get_flat_head(_store: &Store, _shard_id: ShardId) -> Option<CryptoHash> {
None
Expand All @@ -72,7 +71,7 @@ pub mod store_helper {
_store: &Store,
_shard_id: ShardId,
_block_hash: CryptoHash,
) -> Result<Option<Arc<FlatStateDelta>>, FlatStorageError> {
) -> Result<Option<FlatStateDelta>, FlatStorageError> {
Err(FlatStorageError::StorageInternalError)
}

Expand Down
32 changes: 20 additions & 12 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use near_primitives::types::{BlockHeight, ShardId};
#[cfg(feature = "protocol_feature_flat_state")]
use tracing::info;

use crate::flat::delta::CachedFlatStateDelta;
use crate::{metrics, Store, StoreUpdate};

use super::delta::FlatStateDelta;
Expand Down Expand Up @@ -51,10 +52,10 @@ pub(crate) struct FlatStorageInner {
/// paths between the root block and a target block
#[allow(unused)]
blocks: HashMap<CryptoHash, BlockInfo>,
/// State deltas for all blocks supported by this flat storage.
/// All these deltas here are stored on disk too.
/// Cached deltas for all blocks supported by this flat storage.
/// Uncompressed deltas are stored on DB too, but are cached here for faster access.
#[allow(unused)]
deltas: HashMap<CryptoHash, Arc<FlatStateDelta>>,
deltas: HashMap<CryptoHash, Arc<CachedFlatStateDelta>>,
/// Cache for the mapping from trie storage keys to value refs for `flat_head`.
/// Must be equivalent to the mapping stored on disk only for `flat_head`. For
/// other blocks, deltas have to be applied as usual.
Expand Down Expand Up @@ -89,7 +90,10 @@ impl FlatStorageInner {
}

/// Gets delta for the given block and shard `self.shard_id`.
fn get_delta(&self, block_hash: &CryptoHash) -> Result<Arc<FlatStateDelta>, FlatStorageError> {
fn get_delta(
&self,
block_hash: &CryptoHash,
) -> Result<Arc<CachedFlatStateDelta>, FlatStorageError> {
// TODO (#7327): add limitation on cached deltas number to limit RAM usage
// and read single `ValueRef` from delta if it is not cached.
Ok(self
Expand Down Expand Up @@ -220,15 +224,16 @@ impl FlatStorage {
);
blocks.insert(hash, block_info);
metrics.cached_blocks.inc();
let delta = store_helper::get_delta(&store, shard_id, hash)
let delta: CachedFlatStateDelta = store_helper::get_delta(&store, shard_id, hash)
.expect("Borsh cannot fail")
.unwrap_or_else(|| {
panic!("Cannot find block delta for block {:?} shard {}", hash, shard_id)
});
})
.into();
metrics.cached_deltas.inc();
metrics.cached_deltas_num_items.add(delta.len() as i64);
metrics.cached_deltas_size.add(delta.total_size() as i64);
deltas.insert(hash, delta);
deltas.insert(hash, Arc::new(delta));
}
}

Expand Down Expand Up @@ -312,7 +317,9 @@ impl FlatStorage {
let blocks = guard.get_blocks_to_head(new_head)?;
for block in blocks.into_iter().rev() {
let mut store_update = StoreUpdate::new(guard.store.storage.clone());
let delta = guard.get_delta(&block)?.as_ref().clone();
// We unwrap here because flat storage is locked and we could retrieve path from old to new head, so delta
// must exist.
let delta = store_helper::get_delta(&guard.store, guard.shard_id, block)?.unwrap();
for (key, value) in delta.0.iter() {
guard.put_value_ref_to_cache(key.clone(), value.clone());
}
Expand Down Expand Up @@ -402,10 +409,11 @@ impl FlatStorage {
}
let mut store_update = StoreUpdate::new(guard.store.storage.clone());
store_helper::set_delta(&mut store_update, guard.shard_id, block_hash.clone(), &delta)?;
let cached_delta: CachedFlatStateDelta = delta.into();
guard.metrics.cached_deltas.inc();
guard.metrics.cached_deltas_num_items.add(delta.len() as i64);
guard.metrics.cached_deltas_size.add(delta.total_size() as i64);
guard.deltas.insert(*block_hash, Arc::new(delta));
guard.metrics.cached_deltas_num_items.add(cached_delta.len() as i64);
guard.metrics.cached_deltas_size.add(cached_delta.total_size() as i64);
guard.deltas.insert(*block_hash, Arc::new(cached_delta));
guard.blocks.insert(*block_hash, block);
guard.metrics.cached_blocks.inc();
Ok(store_update)
Expand Down Expand Up @@ -674,7 +682,7 @@ mod tests {
(vec![4], None),
(vec![5], Some(ValueRef::new(&[9]))),
]);
delta.merge(&delta_new);
delta.merge(delta_new);

assert_eq!(delta.get(&[1]), Some(Some(ValueRef::new(&[4]))));
assert_eq!(delta.get(&[2]), Some(Some(ValueRef::new(&[7]))));
Expand Down
6 changes: 2 additions & 4 deletions core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use near_primitives::shard_layout::{account_id_to_shard_id, ShardLayout};
use near_primitives::state::ValueRef;
use near_primitives::trie_key::trie_key_parsers::parse_account_id_from_raw_key;
use near_primitives::types::ShardId;
use std::sync::Arc;

/// Prefixes determining type of flat storage creation status stored in DB.
/// Note that non-existent status is treated as SavingDeltas if flat storage /// does not exist and Ready if it does.
Expand All @@ -27,12 +26,11 @@ pub fn get_delta(
store: &Store,
shard_id: ShardId,
block_hash: CryptoHash,
) -> Result<Option<Arc<FlatStateDelta>>, FlatStorageError> {
) -> Result<Option<FlatStateDelta>, FlatStorageError> {
let key = KeyForFlatStateDelta { shard_id, block_hash };
Ok(store
.get_ser::<FlatStateDelta>(crate::DBCol::FlatStateDeltas, &key.try_to_vec().unwrap())
.map_err(|_| FlatStorageError::StorageInternalError)?
.map(|delta| Arc::new(delta)))
.map_err(|_| FlatStorageError::StorageInternalError)?)
}

pub fn set_delta(
Expand Down

0 comments on commit 37793cf

Please sign in to comment.