diff --git a/core/primitives/src/version.rs b/core/primitives/src/version.rs index cdd7ed09068..c27e78f1262 100644 --- a/core/primitives/src/version.rs +++ b/core/primitives/src/version.rs @@ -12,7 +12,7 @@ pub struct Version { pub type DbVersion = u32; /// Current version of the database. -pub const DB_VERSION: DbVersion = 5; +pub const DB_VERSION: DbVersion = 6; /// Protocol version type. pub type ProtocolVersion = u32; diff --git a/core/store/src/db.rs b/core/store/src/db.rs index 4ce8baebcf0..da76e1c6ed2 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -11,11 +11,13 @@ use borsh::{BorshDeserialize, BorshSerialize}; use rocksdb::Env; use rocksdb::{ BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor, Direction, IteratorMode, - Options, ReadOptions, WriteBatch, DB, + MergeOperands, Options, ReadOptions, WriteBatch, DB, }; use strum_macros::EnumIter; +use crate::trie::merge_refcounted_records; use near_primitives::version::DbVersion; +use rocksdb::compaction_filter::Decision; use std::marker::PhantomPinned; #[derive(Debug, Clone, PartialEq)] @@ -215,6 +217,7 @@ pub struct DBTransaction { pub enum DBOp { Insert { col: DBCol, key: Vec, value: Vec }, + UpdateRefcount { col: DBCol, key: Vec, value: Vec }, Delete { col: DBCol, key: Vec }, } @@ -227,6 +230,19 @@ impl DBTransaction { }); } + pub fn update_refcount, V: AsRef<[u8]>>( + &mut self, + col: DBCol, + key: K, + value: V, + ) { + self.ops.push(DBOp::UpdateRefcount { + col, + key: key.as_ref().to_owned(), + value: value.as_ref().to_owned(), + }); + } + pub fn delete>(&mut self, col: DBCol, key: K) { self.ops.push(DBOp::Delete { col, key: key.as_ref().to_owned() }); } @@ -264,7 +280,8 @@ pub trait Database: Sync + Send { impl Database for RocksDB { fn get(&self, col: DBCol, key: &[u8]) -> Result>, DBError> { let read_options = rocksdb_read_options(); - unsafe { Ok(self.db.get_cf_opt(&*self.cfs[col as usize], key, &read_options)?) } + let result = self.db.get_cf_opt(unsafe { &*self.cfs[col as usize] }, key, &read_options)?; + Ok(RocksDB::empty_value_filtering_get(col, result)) } fn iter<'a>(&'a self, col: DBCol) -> Box, Box<[u8]>)> + 'a> { @@ -272,7 +289,7 @@ impl Database for RocksDB { unsafe { let cf_handle = &*self.cfs[col as usize]; let iterator = self.db.iterator_cf_opt(cf_handle, read_options, IteratorMode::Start); - Box::new(iterator) + RocksDB::empty_value_filtering_iter(col, iterator) } } @@ -297,7 +314,7 @@ impl Database for RocksDB { IteratorMode::From(key_prefix, Direction::Forward), ) .take_while(move |(key, _value)| key.starts_with(key_prefix)); - Box::new(iterator) + RocksDB::empty_value_filtering_iter(col, iterator) } } @@ -308,6 +325,9 @@ impl Database for RocksDB { DBOp::Insert { col, key, value } => unsafe { batch.put_cf(&*self.cfs[col as usize], key, value); }, + DBOp::UpdateRefcount { col, key, value } => unsafe { + batch.merge_cf(&*self.cfs[col as usize], key, value); + }, DBOp::Delete { col, key } => unsafe { batch.delete_cf(&*self.cfs[col as usize], key); }, @@ -343,6 +363,15 @@ impl Database for TestDB { for op in transaction.ops { match op { DBOp::Insert { col, key, value } => db[col as usize].insert(key, value), + DBOp::UpdateRefcount { col, key, value } => { + let mut val = db[col as usize].get(&key).cloned().unwrap_or_default(); + merge_refcounted_records(&mut val, &value).unwrap(); + if val.len() != 0 { + db[col as usize].insert(key, val) + } else { + db[col as usize].remove(&key) + } + } DBOp::Delete { col, key } => db[col as usize].remove(&key), }; } @@ -398,13 +427,17 @@ fn rocksdb_block_based_options() -> BlockBasedOptions { block_opts } -fn rocksdb_column_options() -> Options { +fn rocksdb_column_options(col: DBCol) -> Options { let mut opts = Options::default(); opts.set_level_compaction_dynamic_level_bytes(true); opts.set_block_based_table_factory(&rocksdb_block_based_options()); opts.optimize_level_style_compaction(1024 * 1024 * 128); opts.set_target_file_size_base(1024 * 1024 * 64); opts.set_compression_per_level(&[]); + if col == DBCol::ColState { + opts.set_merge_operator("refcount merge", RocksDB::refcount_merge, None); + opts.set_compaction_filter("empty value filter", RocksDB::empty_value_compaction_filter); + } opts } @@ -431,11 +464,12 @@ impl RocksDB { } pub fn new>(path: P) -> Result { + use strum::IntoEnumIterator; let options = rocksdb_options(); - let cf_names: Vec<_> = (0..NUM_COLS).map(|col| format!("col{}", col)).collect(); - let cf_descriptors = cf_names - .iter() - .map(|cf_name| ColumnFamilyDescriptor::new(cf_name, rocksdb_column_options())); + let cf_names: Vec<_> = DBCol::iter().map(|col| format!("col{}", col as usize)).collect(); + let cf_descriptors = DBCol::iter().map(|col| { + ColumnFamilyDescriptor::new(format!("col{}", col as usize), rocksdb_column_options(col)) + }); let db = DB::open_cf_descriptors(&options, path, cf_descriptors)?; #[cfg(feature = "single_thread_rocksdb")] { @@ -469,3 +503,135 @@ impl TestDB { Self { db: RwLock::new(db) } } } + +impl RocksDB { + /// ColState has refcounted values. + /// Merge adds refcounts, zero refcount becomes empty value. + /// Empty values get filtered by get methods, and removed by compaction. + fn refcount_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, + ) -> Option> { + let mut result = vec![]; + if let Some(val) = existing_val { + // Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes) + merge_refcounted_records(&mut result, val) + .expect("Not a refcounted record in ColState"); + } + for val in operands { + // Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes) + merge_refcounted_records(&mut result, val) + .expect("Not a refcounted record in ColState"); + } + Some(result) + } + + /// Compaction filter for ColState + fn empty_value_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision { + if value.is_empty() { + Decision::Remove + } else { + Decision::Keep + } + } + + /// ColState get() treats empty value as no value + fn empty_value_filtering_get(column: DBCol, value: Option>) -> Option> { + if column == DBCol::ColState && Some(vec![]) == value { + None + } else { + value + } + } + + /// ColState iterator treats empty value as no value + fn empty_value_filtering_iter<'a, I>( + column: DBCol, + iterator: I, + ) -> Box, Box<[u8]>)> + 'a> + where + I: Iterator, Box<[u8]>)> + 'a, + { + if column == DBCol::ColState { + Box::new(iterator.filter(|(_k, v)| !v.is_empty())) + } else { + Box::new(iterator) + } + } +} + +#[cfg(test)] +mod tests { + use crate::db::DBCol::ColState; + use crate::db::{rocksdb_read_options, DBError, Database, RocksDB}; + use crate::{create_store, DBCol}; + + impl RocksDB { + #[cfg(not(feature = "single_thread_rocksdb"))] + fn compact(&self, col: DBCol) { + self.db.compact_range_cf::<&[u8], &[u8]>( + unsafe { &*self.cfs[col as usize] }, + None, + None, + ); + } + + fn get_no_empty_filtering( + &self, + col: DBCol, + key: &[u8], + ) -> Result>, DBError> { + let read_options = rocksdb_read_options(); + let result = + self.db.get_cf_opt(unsafe { &*self.cfs[col as usize] }, key, &read_options)?; + Ok(result) + } + } + + #[test] + fn rocksdb_merge_sanity() { + let tmp_dir = tempfile::Builder::new().prefix("_test_snapshot_sanity").tempdir().unwrap(); + let store = create_store(tmp_dir.path().to_str().unwrap()); + assert_eq!(store.get(ColState, &[1]).unwrap(), None); + { + let mut store_update = store.store_update(); + store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]); + store_update.commit().unwrap(); + } + { + let mut store_update = store.store_update(); + store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]); + store_update.commit().unwrap(); + } + assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 2, 0, 0, 0])); + { + let mut store_update = store.store_update(); + store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]); + store_update.commit().unwrap(); + } + assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 1, 0, 0, 0])); + { + let mut store_update = store.store_update(); + store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]); + store_update.commit().unwrap(); + } + // Refcount goes to 0 -> get() returns None + assert_eq!(store.get(ColState, &[1]).unwrap(), None); + let ptr = (&*store.storage) as *const (dyn Database + 'static); + let rocksdb = unsafe { &*(ptr as *const RocksDB) }; + // Internally there is an empty value + assert_eq!(rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), Some(vec![])); + + #[cfg(not(feature = "single_thread_rocksdb"))] + { + // single_thread_rocksdb makes compact hang forever + rocksdb.compact(ColState); + rocksdb.compact(ColState); + + // After compaction the empty value disappears + assert_eq!(rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), None); + assert_eq!(store.get(ColState, &[1]).unwrap(), None); + } + } +} diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 8d4c373cf1f..8d2e351b12f 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -154,6 +154,10 @@ impl StoreUpdate { StoreUpdate { storage, transaction, tries: Some(tries) } } + pub fn update_refcount(&mut self, column: DBCol, key: &[u8], value: &[u8]) { + self.transaction.update_refcount(column, key, value) + } + pub fn set(&mut self, column: DBCol, key: &[u8], value: &[u8]) { self.transaction.put(column, key, value) } @@ -195,6 +199,9 @@ impl StoreUpdate { match op { DBOp::Insert { col, key, value } => self.transaction.put(col, &key, &value), DBOp::Delete { col, key } => self.transaction.delete(col, &key), + DBOp::UpdateRefcount { col, key, value } => { + self.transaction.update_refcount(col, &key, &value) + } } } } @@ -209,6 +216,7 @@ impl StoreUpdate { .map(|op| match op { DBOp::Insert { col, key, .. } => (*col as u8, key), DBOp::Delete { col, key } => (*col as u8, key), + DBOp::UpdateRefcount { col, key, .. } => (*col as u8, key), }) .collect::>() .len(), @@ -232,6 +240,9 @@ impl fmt::Debug for StoreUpdate { for op in self.transaction.ops.iter() { match op { DBOp::Insert { col, key, .. } => writeln!(f, " + {:?} {}", col, to_base(key))?, + DBOp::UpdateRefcount { col, key, .. } => { + writeln!(f, " +- {:?} {}", col, to_base(key))? + } DBOp::Delete { col, key } => writeln!(f, " - {:?} {}", col, to_base(key))?, } } diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index f4b7a1fb981..6865bf4c471 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -17,6 +17,7 @@ use crate::trie::insert_delete::NodesStorage; use crate::trie::iterator::TrieIterator; use crate::trie::nibble_slice::NibbleSlice; pub use crate::trie::shard_tries::{KeyForStateChanges, ShardTries, WrappedTrieChanges}; +pub(crate) use crate::trie::trie_storage::merge_refcounted_records; use crate::trie::trie_storage::{ TouchedNodesCounter, TrieCachingStorage, TrieMemoryPartialStorage, TrieRecordingStorage, TrieStorage, @@ -395,21 +396,21 @@ impl RawTrieNodeWithSize { } } -fn encode_trie_node_with_rc(data: &[u8], rc: u32) -> Vec { +fn encode_trie_node_with_rc(data: &[u8], rc: i32) -> Vec { let mut cursor = Cursor::new(Vec::with_capacity(data.len() + 4)); cursor.write_all(data).unwrap(); - cursor.write_u32::(rc).unwrap(); + cursor.write_i32::(rc).unwrap(); cursor.into_inner() } -fn decode_trie_node_with_rc(bytes: &[u8]) -> Result<(&[u8], u32), StorageError> { +fn decode_trie_node_with_rc(bytes: &[u8]) -> Result<(&[u8], i32), StorageError> { if bytes.len() < 4 { return Err(StorageError::StorageInconsistentState( "Decode node with RC failed".to_string(), )); } let mut cursor = Cursor::new(&bytes[bytes.len() - 4..]); - let rc = cursor.read_u32::().unwrap(); + let rc = cursor.read_i32::().unwrap(); Ok((&bytes[..bytes.len() - 4], rc)) } @@ -745,12 +746,6 @@ impl Trie { pub fn iter<'a>(&'a self, root: &CryptoHash) -> Result, StorageError> { TrieIterator::new(self, root) } - - pub fn update_cache(&self, ops: Vec<(CryptoHash, Option>)>) { - let storage = - self.storage.as_caching_storage().expect("Storage should be TrieCachingStorage"); - storage.update_cache(ops) - } } #[cfg(test)] diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index e2117c81bcc..ceaee58b2d1 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -46,11 +46,13 @@ impl ShardTries { let mut shards = vec![Vec::new(); self.caches.len()]; for op in &transaction.ops { match op { - DBOp::Insert { col, ref key, ref value } if *col == DBCol::ColState => { + DBOp::UpdateRefcount { col, ref key, ref value } if *col == DBCol::ColState => { let (shard_id, hash) = TrieCachingStorage::get_shard_id_and_hash_from_key(key)?; shards[shard_id as usize].push((hash, Some(value.clone()))); } - DBOp::Delete { col, ref key } if *col == DBCol::ColState => { + DBOp::Insert { col, .. } if *col == DBCol::ColState => unreachable!(), + DBOp::Delete { col, key } if *col == DBCol::ColState => { + // Delete is possible in reset_data_pre_state_sync let (shard_id, hash) = TrieCachingStorage::get_shard_id_and_hash_from_key(key)?; shards[shard_id as usize].push((hash, None)); } @@ -70,18 +72,10 @@ impl ShardTries { store_update: &mut StoreUpdate, ) -> Result<(), StorageError> { store_update.tries = Some(tries.clone()); - let trie = tries.get_trie_for_shard(shard_id); - let storage = trie.storage.as_caching_storage().expect("Must be caching storage"); for (hash, value, rc) in deletions.iter() { - let storage_rc = storage.retrieve_rc(&hash)?; - assert!(*rc <= storage_rc); let key = TrieCachingStorage::get_key_from_shard_id_and_hash(shard_id, hash); - if *rc < storage_rc { - let bytes = encode_trie_node_with_rc(&value, storage_rc - rc); - store_update.set(DBCol::ColState, key.as_ref(), &bytes); - } else { - store_update.delete(DBCol::ColState, key.as_ref()); - } + let bytes = encode_trie_node_with_rc(&value, -(*rc as i32)); + store_update.update_refcount(DBCol::ColState, key.as_ref(), &bytes); } Ok(()) } @@ -92,14 +86,11 @@ impl ShardTries { shard_id: ShardId, store_update: &mut StoreUpdate, ) -> Result<(), StorageError> { - let trie = tries.get_trie_for_shard(shard_id); store_update.tries = Some(tries); - let storage = trie.storage.as_caching_storage().expect("Must be caching storage"); for (hash, value, rc) in insertions.iter() { - let storage_rc = storage.retrieve_rc(&hash)?; let key = TrieCachingStorage::get_key_from_shard_id_and_hash(shard_id, hash); - let bytes = encode_trie_node_with_rc(&value, storage_rc + rc); - store_update.set(DBCol::ColState, key.as_ref(), &bytes); + let bytes = encode_trie_node_with_rc(&value, *rc as i32); + store_update.update_refcount(DBCol::ColState, key.as_ref(), &bytes); } Ok(()) } diff --git a/core/store/src/trie/trie_storage.rs b/core/store/src/trie/trie_storage.rs index c7804011d67..636d3c84cff 100644 --- a/core/store/src/trie/trie_storage.rs +++ b/core/store/src/trie/trie_storage.rs @@ -22,10 +22,16 @@ impl TrieCache { pub fn update_cache(&self, ops: Vec<(CryptoHash, Option>)>) { let mut guard = self.0.lock().expect(POISONED_LOCK_ERR); - for (hash, value) in ops { - if let Some(value) = value { - if value.len() < TRIE_LIMIT_CACHED_VALUE_SIZE && guard.cache_get(&hash).is_some() { - guard.cache_set(hash, value); + for (hash, opt_value_rc) in ops { + if let Some(value_rc) = opt_value_rc { + let (value, rc) = + decode_trie_node_with_rc(&value_rc).expect("Don't write invalid values"); + if rc > 0 { + if value.len() < TRIE_LIMIT_CACHED_VALUE_SIZE { + guard.cache_set(hash, value.to_vec()); + } + } else { + guard.cache_remove(&hash); } } else { guard.cache_remove(&hash); @@ -113,16 +119,36 @@ pub struct TrieCachingStorage { pub(crate) shard_id: ShardId, } +pub fn merge_refcounted_records(result: &mut Vec, val: &[u8]) -> Result<(), StorageError> { + if val.is_empty() { + return Ok(()); + } + let add_rc = TrieCachingStorage::vec_to_rc(val)?; + if !result.is_empty() { + let result_rc = TrieCachingStorage::vec_to_rc(result)? + add_rc; + + debug_assert_eq!(result[0..(result.len() - 4)], val[0..(val.len() - 4)]); + let len = result.len(); + result[(len - 4)..].copy_from_slice(&result_rc.to_le_bytes()); + if result_rc == 0 { + *result = vec![]; + } + } else { + *result = val.to_vec(); + } + Ok(()) +} + impl TrieCachingStorage { pub fn new(store: Arc, cache: TrieCache, shard_id: ShardId) -> TrieCachingStorage { TrieCachingStorage { store, cache, shard_id } } - fn vec_to_rc(val: &Vec) -> Result { + fn vec_to_rc(val: &[u8]) -> Result { decode_trie_node_with_rc(&val).map(|(_bytes, rc)| rc) } - fn vec_to_bytes(val: &Vec) -> Result, StorageError> { + fn vec_to_bytes(val: &[u8]) -> Result, StorageError> { decode_trie_node_with_rc(&val).map(|(bytes, _rc)| bytes.to_vec()) } @@ -143,35 +169,13 @@ impl TrieCachingStorage { key[8..].copy_from_slice(hash.as_ref()); key } - - /// Get storage refcount, or 0 if hash is not present - /// # Errors - /// StorageError::StorageInternalError if the storage fails internally. - pub fn retrieve_rc(&self, hash: &CryptoHash) -> Result { - // Ignore cache to be safe. retrieve_rc is used only when writing storage and cache is shared with readers. - let key = Self::get_key_from_shard_id_and_hash(self.shard_id, hash); - let val = self - .store - .get(ColState, key.as_ref()) - .map_err(|_| StorageError::StorageInternalError)?; - if let Some(val) = val { - let rc = Self::vec_to_rc(&val); - rc - } else { - Ok(0) - } - } - - pub fn update_cache(&self, ops: Vec<(CryptoHash, Option>)>) { - self.cache.update_cache(ops) - } } impl TrieStorage for TrieCachingStorage { fn retrieve_raw_bytes(&self, hash: &CryptoHash) -> Result, StorageError> { let mut guard = self.cache.0.lock().expect(POISONED_LOCK_ERR); if let Some(val) = guard.cache_get(hash) { - Self::vec_to_bytes(val) + Ok(val.clone()) } else { let key = Self::get_key_from_shard_id_and_hash(self.shard_id, hash); let val = self @@ -180,8 +184,11 @@ impl TrieStorage for TrieCachingStorage { .map_err(|_| StorageError::StorageInternalError)?; if let Some(val) = val { let raw_node = Self::vec_to_bytes(&val); - if val.len() < TRIE_LIMIT_CACHED_VALUE_SIZE { - guard.cache_set(*hash, val); + debug_assert!(Self::vec_to_rc(&val).unwrap() > 0); + if val.len() < TRIE_LIMIT_CACHED_VALUE_SIZE && raw_node.is_ok() { + if let Ok(ref bytes) = raw_node { + guard.cache_set(*hash, bytes.clone()); + } } raw_node } else { diff --git a/neard/src/lib.rs b/neard/src/lib.rs index ff8836f0d01..bcd3d54e858 100644 --- a/neard/src/lib.rs +++ b/neard/src/lib.rs @@ -96,6 +96,12 @@ pub fn apply_store_migrations(path: &String) { let store = create_store(&path); set_store_version(&store, 5); } + if db_version <= 5 { + // version 5 => 6: add merge operator to ColState + // we don't have merge records before so old storage works + let store = create_store(&path); + set_store_version(&store, 6); + } let db_version = get_store_version(path); debug_assert_eq!(db_version, near_primitives::version::DB_VERSION);