Skip to content

Commit

Permalink
Merge branch 'master' into routing_table_threaded
Browse files Browse the repository at this point in the history
  • Loading branch information
mfornet committed Aug 12, 2020
2 parents 482bc0e + 1bf2fb3 commit 33b2269
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 68 deletions.
2 changes: 1 addition & 1 deletion core/primitives/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct Version {
pub type DbVersion = u32;

/// Current version of the database.
pub const DB_VERSION: DbVersion = 5;
pub const DB_VERSION: DbVersion = 6;

/// Protocol version type.
pub type ProtocolVersion = u32;
Expand Down
184 changes: 175 additions & 9 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use borsh::{BorshDeserialize, BorshSerialize};
use rocksdb::Env;
use rocksdb::{
BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor, Direction, IteratorMode,
Options, ReadOptions, WriteBatch, DB,
MergeOperands, Options, ReadOptions, WriteBatch, DB,
};
use strum_macros::EnumIter;

use crate::trie::merge_refcounted_records;
use near_primitives::version::DbVersion;
use rocksdb::compaction_filter::Decision;
use std::marker::PhantomPinned;

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -215,6 +217,7 @@ pub struct DBTransaction {

pub enum DBOp {
Insert { col: DBCol, key: Vec<u8>, value: Vec<u8> },
UpdateRefcount { col: DBCol, key: Vec<u8>, value: Vec<u8> },
Delete { col: DBCol, key: Vec<u8> },
}

Expand All @@ -227,6 +230,19 @@ impl DBTransaction {
});
}

pub fn update_refcount<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&mut self,
col: DBCol,
key: K,
value: V,
) {
self.ops.push(DBOp::UpdateRefcount {
col,
key: key.as_ref().to_owned(),
value: value.as_ref().to_owned(),
});
}

pub fn delete<K: AsRef<[u8]>>(&mut self, col: DBCol, key: K) {
self.ops.push(DBOp::Delete { col, key: key.as_ref().to_owned() });
}
Expand Down Expand Up @@ -264,15 +280,16 @@ pub trait Database: Sync + Send {
impl Database for RocksDB {
fn get(&self, col: DBCol, key: &[u8]) -> Result<Option<Vec<u8>>, DBError> {
let read_options = rocksdb_read_options();
unsafe { Ok(self.db.get_cf_opt(&*self.cfs[col as usize], key, &read_options)?) }
let result = self.db.get_cf_opt(unsafe { &*self.cfs[col as usize] }, key, &read_options)?;
Ok(RocksDB::empty_value_filtering_get(col, result))
}

fn iter<'a>(&'a self, col: DBCol) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
let read_options = rocksdb_read_options();
unsafe {
let cf_handle = &*self.cfs[col as usize];
let iterator = self.db.iterator_cf_opt(cf_handle, read_options, IteratorMode::Start);
Box::new(iterator)
RocksDB::empty_value_filtering_iter(col, iterator)
}
}

Expand All @@ -297,7 +314,7 @@ impl Database for RocksDB {
IteratorMode::From(key_prefix, Direction::Forward),
)
.take_while(move |(key, _value)| key.starts_with(key_prefix));
Box::new(iterator)
RocksDB::empty_value_filtering_iter(col, iterator)
}
}

Expand All @@ -308,6 +325,9 @@ impl Database for RocksDB {
DBOp::Insert { col, key, value } => unsafe {
batch.put_cf(&*self.cfs[col as usize], key, value);
},
DBOp::UpdateRefcount { col, key, value } => unsafe {
batch.merge_cf(&*self.cfs[col as usize], key, value);
},
DBOp::Delete { col, key } => unsafe {
batch.delete_cf(&*self.cfs[col as usize], key);
},
Expand Down Expand Up @@ -343,6 +363,15 @@ impl Database for TestDB {
for op in transaction.ops {
match op {
DBOp::Insert { col, key, value } => db[col as usize].insert(key, value),
DBOp::UpdateRefcount { col, key, value } => {
let mut val = db[col as usize].get(&key).cloned().unwrap_or_default();
merge_refcounted_records(&mut val, &value).unwrap();
if val.len() != 0 {
db[col as usize].insert(key, val)
} else {
db[col as usize].remove(&key)
}
}
DBOp::Delete { col, key } => db[col as usize].remove(&key),
};
}
Expand Down Expand Up @@ -398,13 +427,17 @@ fn rocksdb_block_based_options() -> BlockBasedOptions {
block_opts
}

fn rocksdb_column_options() -> Options {
fn rocksdb_column_options(col: DBCol) -> Options {
let mut opts = Options::default();
opts.set_level_compaction_dynamic_level_bytes(true);
opts.set_block_based_table_factory(&rocksdb_block_based_options());
opts.optimize_level_style_compaction(1024 * 1024 * 128);
opts.set_target_file_size_base(1024 * 1024 * 64);
opts.set_compression_per_level(&[]);
if col == DBCol::ColState {
opts.set_merge_operator("refcount merge", RocksDB::refcount_merge, None);
opts.set_compaction_filter("empty value filter", RocksDB::empty_value_compaction_filter);
}
opts
}

Expand All @@ -431,11 +464,12 @@ impl RocksDB {
}

pub fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Self, DBError> {
use strum::IntoEnumIterator;
let options = rocksdb_options();
let cf_names: Vec<_> = (0..NUM_COLS).map(|col| format!("col{}", col)).collect();
let cf_descriptors = cf_names
.iter()
.map(|cf_name| ColumnFamilyDescriptor::new(cf_name, rocksdb_column_options()));
let cf_names: Vec<_> = DBCol::iter().map(|col| format!("col{}", col as usize)).collect();
let cf_descriptors = DBCol::iter().map(|col| {
ColumnFamilyDescriptor::new(format!("col{}", col as usize), rocksdb_column_options(col))
});
let db = DB::open_cf_descriptors(&options, path, cf_descriptors)?;
#[cfg(feature = "single_thread_rocksdb")]
{
Expand Down Expand Up @@ -469,3 +503,135 @@ impl TestDB {
Self { db: RwLock::new(db) }
}
}

impl RocksDB {
/// ColState has refcounted values.
/// Merge adds refcounts, zero refcount becomes empty value.
/// Empty values get filtered by get methods, and removed by compaction.
fn refcount_merge(
_new_key: &[u8],
existing_val: Option<&[u8]>,
operands: &mut MergeOperands,
) -> Option<Vec<u8>> {
let mut result = vec![];
if let Some(val) = existing_val {
// Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes)
merge_refcounted_records(&mut result, val)
.expect("Not a refcounted record in ColState");
}
for val in operands {
// Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes)
merge_refcounted_records(&mut result, val)
.expect("Not a refcounted record in ColState");
}
Some(result)
}

/// Compaction filter for ColState
fn empty_value_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
if value.is_empty() {
Decision::Remove
} else {
Decision::Keep
}
}

/// ColState get() treats empty value as no value
fn empty_value_filtering_get(column: DBCol, value: Option<Vec<u8>>) -> Option<Vec<u8>> {
if column == DBCol::ColState && Some(vec![]) == value {
None
} else {
value
}
}

/// ColState iterator treats empty value as no value
fn empty_value_filtering_iter<'a, I>(
column: DBCol,
iterator: I,
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a>
where
I: Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a,
{
if column == DBCol::ColState {
Box::new(iterator.filter(|(_k, v)| !v.is_empty()))
} else {
Box::new(iterator)
}
}
}

#[cfg(test)]
mod tests {
use crate::db::DBCol::ColState;
use crate::db::{rocksdb_read_options, DBError, Database, RocksDB};
use crate::{create_store, DBCol};

impl RocksDB {
#[cfg(not(feature = "single_thread_rocksdb"))]
fn compact(&self, col: DBCol) {
self.db.compact_range_cf::<&[u8], &[u8]>(
unsafe { &*self.cfs[col as usize] },
None,
None,
);
}

fn get_no_empty_filtering(
&self,
col: DBCol,
key: &[u8],
) -> Result<Option<Vec<u8>>, DBError> {
let read_options = rocksdb_read_options();
let result =
self.db.get_cf_opt(unsafe { &*self.cfs[col as usize] }, key, &read_options)?;
Ok(result)
}
}

#[test]
fn rocksdb_merge_sanity() {
let tmp_dir = tempfile::Builder::new().prefix("_test_snapshot_sanity").tempdir().unwrap();
let store = create_store(tmp_dir.path().to_str().unwrap());
assert_eq!(store.get(ColState, &[1]).unwrap(), None);
{
let mut store_update = store.store_update();
store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]);
store_update.commit().unwrap();
}
{
let mut store_update = store.store_update();
store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]);
store_update.commit().unwrap();
}
assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 2, 0, 0, 0]));
{
let mut store_update = store.store_update();
store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]);
store_update.commit().unwrap();
}
assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 1, 0, 0, 0]));
{
let mut store_update = store.store_update();
store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]);
store_update.commit().unwrap();
}
// Refcount goes to 0 -> get() returns None
assert_eq!(store.get(ColState, &[1]).unwrap(), None);
let ptr = (&*store.storage) as *const (dyn Database + 'static);
let rocksdb = unsafe { &*(ptr as *const RocksDB) };
// Internally there is an empty value
assert_eq!(rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), Some(vec![]));

#[cfg(not(feature = "single_thread_rocksdb"))]
{
// single_thread_rocksdb makes compact hang forever
rocksdb.compact(ColState);
rocksdb.compact(ColState);

// After compaction the empty value disappears
assert_eq!(rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), None);
assert_eq!(store.get(ColState, &[1]).unwrap(), None);
}
}
}
11 changes: 11 additions & 0 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ impl StoreUpdate {
StoreUpdate { storage, transaction, tries: Some(tries) }
}

pub fn update_refcount(&mut self, column: DBCol, key: &[u8], value: &[u8]) {
self.transaction.update_refcount(column, key, value)
}

pub fn set(&mut self, column: DBCol, key: &[u8], value: &[u8]) {
self.transaction.put(column, key, value)
}
Expand Down Expand Up @@ -195,6 +199,9 @@ impl StoreUpdate {
match op {
DBOp::Insert { col, key, value } => self.transaction.put(col, &key, &value),
DBOp::Delete { col, key } => self.transaction.delete(col, &key),
DBOp::UpdateRefcount { col, key, value } => {
self.transaction.update_refcount(col, &key, &value)
}
}
}
}
Expand All @@ -209,6 +216,7 @@ impl StoreUpdate {
.map(|op| match op {
DBOp::Insert { col, key, .. } => (*col as u8, key),
DBOp::Delete { col, key } => (*col as u8, key),
DBOp::UpdateRefcount { col, key, .. } => (*col as u8, key),
})
.collect::<std::collections::HashSet<_>>()
.len(),
Expand All @@ -232,6 +240,9 @@ impl fmt::Debug for StoreUpdate {
for op in self.transaction.ops.iter() {
match op {
DBOp::Insert { col, key, .. } => writeln!(f, " + {:?} {}", col, to_base(key))?,
DBOp::UpdateRefcount { col, key, .. } => {
writeln!(f, " +- {:?} {}", col, to_base(key))?
}
DBOp::Delete { col, key } => writeln!(f, " - {:?} {}", col, to_base(key))?,
}
}
Expand Down
15 changes: 5 additions & 10 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::trie::insert_delete::NodesStorage;
use crate::trie::iterator::TrieIterator;
use crate::trie::nibble_slice::NibbleSlice;
pub use crate::trie::shard_tries::{KeyForStateChanges, ShardTries, WrappedTrieChanges};
pub(crate) use crate::trie::trie_storage::merge_refcounted_records;
use crate::trie::trie_storage::{
TouchedNodesCounter, TrieCachingStorage, TrieMemoryPartialStorage, TrieRecordingStorage,
TrieStorage,
Expand Down Expand Up @@ -395,21 +396,21 @@ impl RawTrieNodeWithSize {
}
}

fn encode_trie_node_with_rc(data: &[u8], rc: u32) -> Vec<u8> {
fn encode_trie_node_with_rc(data: &[u8], rc: i32) -> Vec<u8> {
let mut cursor = Cursor::new(Vec::with_capacity(data.len() + 4));
cursor.write_all(data).unwrap();
cursor.write_u32::<LittleEndian>(rc).unwrap();
cursor.write_i32::<LittleEndian>(rc).unwrap();
cursor.into_inner()
}

fn decode_trie_node_with_rc(bytes: &[u8]) -> Result<(&[u8], u32), StorageError> {
fn decode_trie_node_with_rc(bytes: &[u8]) -> Result<(&[u8], i32), StorageError> {
if bytes.len() < 4 {
return Err(StorageError::StorageInconsistentState(
"Decode node with RC failed".to_string(),
));
}
let mut cursor = Cursor::new(&bytes[bytes.len() - 4..]);
let rc = cursor.read_u32::<LittleEndian>().unwrap();
let rc = cursor.read_i32::<LittleEndian>().unwrap();
Ok((&bytes[..bytes.len() - 4], rc))
}

Expand Down Expand Up @@ -745,12 +746,6 @@ impl Trie {
pub fn iter<'a>(&'a self, root: &CryptoHash) -> Result<TrieIterator<'a>, StorageError> {
TrieIterator::new(self, root)
}

pub fn update_cache(&self, ops: Vec<(CryptoHash, Option<Vec<u8>>)>) {
let storage =
self.storage.as_caching_storage().expect("Storage should be TrieCachingStorage");
storage.update_cache(ops)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 33b2269

Please sign in to comment.