Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce cached flat storage deltas #8662

Merged
merged 11 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl FlatStorageShardCreator {
flat_head = chain_store.get_next_block_hash(&flat_head).unwrap();
let delta =
store_helper::get_delta(store, shard_id, flat_head).unwrap().unwrap();
merged_delta.merge(delta.as_ref());
merged_delta.merge(delta);
}

if (old_flat_head != &flat_head) || (flat_head == chain_final_head.last_block_hash)
Expand Down
85 changes: 56 additions & 29 deletions core/store/src/flat_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,6 @@ impl<const N: usize> From<[(Vec<u8>, Option<ValueRef>); N]> for FlatStateDelta {
}

impl FlatStateDelta {
/// Assumed number of bytes used to store an entry in the cache.
///
/// Based on 36 bytes for `ValueRef` + guessed overhead of 24 bytes for `Vec` and `HashMap`.
pub(crate) const PER_ENTRY_OVERHEAD: u64 = 60;

/// Returns `Some(Option<ValueRef>)` from delta for the given key. If key is not present, returns None.
pub fn get(&self, key: &[u8]) -> Option<Option<ValueRef>> {
self.0.get(key).cloned()
Expand All @@ -358,13 +353,9 @@ impl FlatStateDelta {
self.0.len()
}

fn total_size(&self) -> u64 {
self.0.keys().map(|key| key.len() as u64 + Self::PER_ENTRY_OVERHEAD).sum()
}

/// Merge two deltas. Values from `other` should override values from `self`.
pub fn merge(&mut self, other: &Self) {
self.0.extend(other.0.iter().map(|(k, v)| (k.clone(), v.clone())))
pub fn merge(&mut self, other: Self) {
self.0.extend(other.0.into_iter())
}

/// Creates delta using raw state changes for some block.
Expand Down Expand Up @@ -405,9 +396,41 @@ impl FlatStateDelta {
pub fn apply_to_flat_state(self, _store_update: &mut StoreUpdate) {}
}

/// `FlatStateDelta` which uses hash of raw `TrieKey`s instead of keys themselves.
/// Used to reduce memory used by deltas and serves read queries.
pub struct CachedFlatStateDelta(HashMap<CryptoHash, Option<ValueRef>>);

impl From<FlatStateDelta> for CachedFlatStateDelta {
fn from(delta: FlatStateDelta) -> Self {
Self(delta.0.iter().map(|(key, value)| (hash(key), value.clone())).collect())
}
}

impl CachedFlatStateDelta {
/// Size of cache entry in bytes.
const ENTRY_SIZE: usize =
std::mem::size_of::<CryptoHash>() + std::mem::size_of::<Option<ValueRef>>();

/// Returns `Some(Option<ValueRef>)` from delta for the given key. If key is not present, returns None.
pub fn get(&self, key: &[u8]) -> Option<Option<ValueRef>> {
self.0.get(&hash(key)).cloned()
}

/// Returns number of all entries.
pub fn len(&self) -> usize {
self.0.len()
}

/// Total size in bytes consumed by delta. May be changed if we implement inlining of `ValueRef`s.
fn total_size(&self) -> u64 {
(self.0.capacity() as u64) * (Self::ENTRY_SIZE as u64)
}
}

use lru::LruCache;
use near_o11y::metrics::IntGauge;
use near_primitives::errors::StorageError;
use near_primitives::hash::hash;
use near_primitives::shard_layout::ShardLayout;
use std::sync::{Arc, RwLock};
#[cfg(feature = "protocol_feature_flat_state")]
Expand Down Expand Up @@ -461,10 +484,10 @@ struct FlatStorageStateInner {
/// paths between the root block and a target block
#[allow(unused)]
blocks: HashMap<CryptoHash, BlockInfo>,
/// State deltas for all blocks supported by this flat storage.
/// All these deltas here are stored on disk too.
/// Cached deltas for all blocks supported by this flat storage.
/// Uncompressed deltas are stored on DB too, but are cached here for faster access.
#[allow(unused)]
deltas: HashMap<CryptoHash, Arc<FlatStateDelta>>,
deltas: HashMap<CryptoHash, Arc<CachedFlatStateDelta>>,
/// Cache for the mapping from trie storage keys to value refs for `flat_head`.
/// Must be equivalent to the mapping stored on disk only for `flat_head`. For
/// other blocks, deltas have to be applied as usual.
Expand Down Expand Up @@ -570,7 +593,6 @@ pub mod store_helper {
use near_primitives::state::ValueRef;
use near_primitives::trie_key::trie_key_parsers::parse_account_id_from_raw_key;
use near_primitives::types::ShardId;
use std::sync::Arc;

/// Prefixes determining type of flat storage creation status stored in DB.
/// Note that non-existent status is treated as SavingDeltas if flat storage
Expand All @@ -586,12 +608,11 @@ pub mod store_helper {
store: &Store,
shard_id: ShardId,
block_hash: CryptoHash,
) -> Result<Option<Arc<FlatStateDelta>>, FlatStorageError> {
) -> Result<Option<FlatStateDelta>, FlatStorageError> {
let key = KeyForFlatStateDelta { shard_id, block_hash };
Ok(store
.get_ser::<FlatStateDelta>(crate::DBCol::FlatStateDeltas, &key.try_to_vec().unwrap())
.map_err(|_| FlatStorageError::StorageInternalError)?
.map(|delta| Arc::new(delta)))
.map_err(|_| FlatStorageError::StorageInternalError)?)
}

pub fn set_delta(
Expand Down Expand Up @@ -785,7 +806,6 @@ pub mod store_helper {
use crate::Store;
use near_primitives::hash::CryptoHash;
use near_primitives::types::ShardId;
use std::sync::Arc;

pub fn get_flat_head(_store: &Store, _shard_id: ShardId) -> Option<CryptoHash> {
None
Expand All @@ -795,7 +815,7 @@ pub mod store_helper {
_store: &Store,
_shard_id: ShardId,
_block_hash: CryptoHash,
) -> Result<Option<Arc<FlatStateDelta>>, FlatStorageError> {
) -> Result<Option<FlatStateDelta>, FlatStorageError> {
Err(FlatStorageError::StorageInternalError)
}

Expand Down Expand Up @@ -823,7 +843,10 @@ impl FlatStorageStateInner {
}

/// Gets delta for the given block and shard `self.shard_id`.
fn get_delta(&self, block_hash: &CryptoHash) -> Result<Arc<FlatStateDelta>, FlatStorageError> {
fn get_delta(
&self,
block_hash: &CryptoHash,
) -> Result<Arc<CachedFlatStateDelta>, FlatStorageError> {
// TODO (#7327): add limitation on cached deltas number to limit RAM usage
// and read single `ValueRef` from delta if it is not cached.
Ok(self
Expand Down Expand Up @@ -954,15 +977,16 @@ impl FlatStorageState {
);
blocks.insert(hash, block_info);
metrics.cached_blocks.inc();
let delta = store_helper::get_delta(&store, shard_id, hash)
let delta: CachedFlatStateDelta = store_helper::get_delta(&store, shard_id, hash)
.expect(BORSH_ERR)
.unwrap_or_else(|| {
panic!("Cannot find block delta for block {:?} shard {}", hash, shard_id)
});
})
.into();
metrics.cached_deltas.inc();
metrics.cached_deltas_num_items.add(delta.len() as i64);
metrics.cached_deltas_size.add(delta.total_size() as i64);
deltas.insert(hash, delta);
deltas.insert(hash, Arc::new(delta));
}
}

Expand Down Expand Up @@ -1046,7 +1070,9 @@ impl FlatStorageState {
let blocks = guard.get_blocks_to_head(new_head)?;
for block in blocks.into_iter().rev() {
let mut store_update = StoreUpdate::new(guard.store.storage.clone());
let delta = guard.get_delta(&block)?.as_ref().clone();
// We unwrap here because flat storage is locked and we could retrieve path from old to new head, so delta
// must exist.
let delta = store_helper::get_delta(&guard.store, guard.shard_id, block)?.unwrap();
for (key, value) in delta.0.iter() {
guard.put_value_ref_to_cache(key.clone(), value.clone());
}
Expand Down Expand Up @@ -1136,10 +1162,11 @@ impl FlatStorageState {
}
let mut store_update = StoreUpdate::new(guard.store.storage.clone());
store_helper::set_delta(&mut store_update, guard.shard_id, block_hash.clone(), &delta)?;
let cached_delta: CachedFlatStateDelta = delta.into();
guard.metrics.cached_deltas.inc();
guard.metrics.cached_deltas_num_items.add(delta.len() as i64);
guard.metrics.cached_deltas_size.add(delta.total_size() as i64);
guard.deltas.insert(*block_hash, Arc::new(delta));
guard.metrics.cached_deltas_num_items.add(cached_delta.len() as i64);
guard.metrics.cached_deltas_size.add(cached_delta.total_size() as i64);
guard.deltas.insert(*block_hash, Arc::new(cached_delta));
guard.blocks.insert(*block_hash, block);
guard.metrics.cached_blocks.inc();
Ok(store_update)
Expand Down Expand Up @@ -1408,7 +1435,7 @@ mod tests {
(vec![4], None),
(vec![5], Some(ValueRef::new(&[9]))),
]);
delta.merge(&delta_new);
delta.merge(delta_new);

assert_eq!(delta.get(&[1]), Some(Some(ValueRef::new(&[4]))));
assert_eq!(delta.get(&[2]), Some(Some(ValueRef::new(&[7]))));
Expand Down