Skip to content

Commit

Permalink
refactor BatchDbWriter to fix bugs (#4176)
Browse files Browse the repository at this point in the history
* refactor BatchDbWriter to fix bugs

1. fix a typo
2. remove unused codes

* speed up flexidag test performance

1. increase blockdag's default cache size to 1024 for test cases

* make get_cf_handle's return value simpler
  • Loading branch information
simonjiao authored Aug 7, 2024
1 parent 10da414 commit 00a37d8
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 47 deletions.
17 changes: 6 additions & 11 deletions flexidag/src/blockdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::consensusdb::{
use crate::ghostdag::protocol::GhostdagManager;
use crate::{process_key_already_error, reachability};
use anyhow::{bail, Ok};
use starcoin_config::{temp_dir, RocksdbConfig};
use starcoin_config::temp_dir;
use starcoin_crypto::{HashValue as Hash, HashValue};
use starcoin_logger::prelude::{debug, info};
use starcoin_types::block::BlockHeader;
Expand All @@ -22,7 +22,6 @@ use starcoin_types::{
consensus_header::ConsensusHeader,
};
use std::ops::DerefMut;
use std::path::Path;
use std::sync::Arc;

pub const DEFAULT_GHOSTDAG_K: KType = 8u16;
Expand Down Expand Up @@ -61,18 +60,14 @@ impl BlockDAG {
}
}
pub fn create_for_testing() -> anyhow::Result<Self> {
let dag_storage =
FlexiDagStorage::create_from_path(temp_dir(), FlexiDagStorageConfig::default())?;
let config = FlexiDagStorageConfig {
cache_size: 1024,
..Default::default()
};
let dag_storage = FlexiDagStorage::create_from_path(temp_dir(), config)?;
Ok(Self::new(DEFAULT_GHOSTDAG_K, dag_storage))
}

pub fn new_by_config(db_path: &Path) -> anyhow::Result<Self> {
let config = FlexiDagStorageConfig::create_with_params(1, RocksdbConfig::default());
let db = FlexiDagStorage::create_from_path(db_path, config)?;
let dag = Self::new(DEFAULT_GHOSTDAG_K, db);
Ok(dag)
}

pub fn has_dag_block(&self, hash: Hash) -> anyhow::Result<bool> {
Ok(self.storage.header_store.has(hash)?)
}
Expand Down
4 changes: 2 additions & 2 deletions flexidag/src/consensusdb/consensus_ghostdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ impl DbGhostdagStore {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.access
.write(BatchDbWriter::new(batch), hash, data.clone())?;
.write(BatchDbWriter::new(batch, &self.db), hash, data.clone())?;
self.compact_access.write(
BatchDbWriter::new(batch),
BatchDbWriter::new(batch, &self.db),
hash,
CompactGhostdagData {
blue_score: data.blue_score,
Expand Down
4 changes: 2 additions & 2 deletions flexidag/src/consensusdb/consensus_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ impl DbHeadersStore {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.headers_access.write(
BatchDbWriter::new(batch),
BatchDbWriter::new(batch, &self.db),
hash,
HeaderWithBlockLevel {
header: header.clone(),
block_level,
},
)?;
self.compact_headers_access.write(
BatchDbWriter::new(batch),
BatchDbWriter::new(batch, &self.db),
hash,
CompactHeaderData {
timestamp: header.timestamp(),
Expand Down
9 changes: 5 additions & 4 deletions flexidag/src/consensusdb/consensus_reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ impl ReachabilityStore for DbReachabilityStore {
));
let mut batch = WriteBatch::default();
self.access
.write(BatchDbWriter::new(&mut batch), origin, data)?;
.write(BatchDbWriter::new(&mut batch, &self.db), origin, data)?;
self.reindex_root
.write(BatchDbWriter::new(&mut batch), &origin)?;
.write(BatchDbWriter::new(&mut batch, &self.db), &origin)?;
self.db
.raw_write_batch(batch)
.map_err(|e| StoreError::DBIoError(e.to_string()))?;
Expand Down Expand Up @@ -255,17 +255,18 @@ impl<'a> StagingReachabilityStore<'a> {
self,
batch: &mut WriteBatch,
) -> Result<RwLockWriteGuard<'a, DbReachabilityStore>, StoreError> {
let db = Arc::clone(&self.store_read.db);
let mut store_write = RwLockUpgradableReadGuard::upgrade(self.store_read);
for (k, v) in self.staging_writes {
let data = Arc::new(v);
store_write
.access
.write(BatchDbWriter::new(batch), k, data)?
.write(BatchDbWriter::new(batch, &db), k, data)?
}
if let Some(root) = self.staging_reindex_root {
store_write
.reindex_root
.write(BatchDbWriter::new(batch), &root)?;
.write(BatchDbWriter::new(batch, &db), &root)?;
}
Ok(store_write)
}
Expand Down
6 changes: 3 additions & 3 deletions flexidag/src/consensusdb/consensus_relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ impl DbRelationsStore {

// Insert a new entry for `hash`
self.parents_access
.write(BatchDbWriter::new(batch), hash, parents.clone())?;
.write(BatchDbWriter::new(batch, &self.db), hash, parents.clone())?;

// The new hash has no children yet
self.children_access.write(
BatchDbWriter::new(batch),
BatchDbWriter::new(batch, &self.db),
hash,
BlockHashes::new(Vec::new()),
)?;
Expand All @@ -117,7 +117,7 @@ impl DbRelationsStore {
let mut children = (*self.get_children(parent)?).clone();
children.push(hash);
self.children_access.write(
BatchDbWriter::new(batch),
BatchDbWriter::new(batch, &self.db),
parent,
BlockHashes::new(children),
)?;
Expand Down
7 changes: 0 additions & 7 deletions flexidag/src/consensusdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ impl FlexiDagStorageConfig {
pub fn new() -> Self {
Self::default()
}

pub fn create_with_params(cache_size: usize, rocksdb_config: RocksdbConfig) -> Self {
Self {
cache_size,
rocksdb_config,
}
}
}

impl From<StorageConfig> for FlexiDagStorageConfig {
Expand Down
11 changes: 7 additions & 4 deletions flexidag/src/consensusdb/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,28 @@ impl DbWriter for DirectDbWriter<'_> {

pub struct BatchDbWriter<'a> {
batch: &'a mut WriteBatch,
db: &'a DBStorage,
}

impl<'a> BatchDbWriter<'a> {
pub fn new(batch: &'a mut WriteBatch) -> Self {
Self { batch }
pub fn new(batch: &'a mut WriteBatch, db: &'a DBStorage) -> Self {
Self { batch, db }
}
}

impl DbWriter for BatchDbWriter<'_> {
fn put<S: Schema>(&mut self, key: &S::Key, value: &S::Value) -> Result<(), StoreError> {
let key = key.encode_key()?;
let value = value.encode_value()?;
self.batch.put(key, value);
let cf_handle = self.db.get_cf_handle(S::COLUMN_FAMILY);
self.batch.put_cf(cf_handle, key, value);
Ok(())
}

fn delete<S: Schema>(&mut self, key: &S::Key) -> Result<(), StoreError> {
let key = key.encode_key()?;
self.batch.delete(key);
let cf_handle = self.db.get_cf_handle(S::COLUMN_FAMILY);
self.batch.delete_cf(cf_handle, key);
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion flexidag/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ fn test_reachability_not_ancestor() -> anyhow::Result<()> {
}

#[test]
fn test_reachability_algorighm() -> anyhow::Result<()> {
fn test_reachability_algorithm() -> anyhow::Result<()> {
let dag = BlockDAG::create_for_testing().unwrap();
let reachability_store = dag.storage.reachability_store.clone();

Expand Down
26 changes: 13 additions & 13 deletions storage/src/db_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl DBStorage {
/// tests.
pub fn flush_all(&self) -> Result<()> {
for cf_name in &self.cfs {
let cf_handle = self.get_cf_handle(cf_name)?;
let cf_handle = self.get_cf_handle(cf_name);
self.db.flush_cf(cf_handle)?;
}
Ok(())
Expand All @@ -190,8 +190,8 @@ impl DBStorage {
rocksdb_current_file.is_file()
}

fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::ColumnFamily> {
self.db.cf_handle(cf_name).ok_or_else(|| {
pub fn get_cf_handle(&self, cf_name: &str) -> &rocksdb::ColumnFamily {
self.db.cf_handle(cf_name).unwrap_or_else(|| {
panic!(
"DB::cf_handle not found for column family name: {}",
cf_name
Expand Down Expand Up @@ -232,7 +232,7 @@ impl DBStorage {
K: KeyCodec,
V: ValueCodec,
{
let cf_handle = self.get_cf_handle(prefix_name)?;
let cf_handle = self.get_cf_handle(prefix_name);
Ok(SchemaIterator::new(
self.db
.raw_iterator_cf_opt(cf_handle, ReadOptions::default()),
Expand All @@ -246,7 +246,7 @@ impl DBStorage {
mode: IteratorMode,
readopts: ReadOptions,
) -> Result<DBIterator> {
let cf_handle = self.get_cf_handle(prefix_name)?;
let cf_handle = self.get_cf_handle(prefix_name);
Ok(self.db.iterator_cf_opt(cf_handle, readopts, mode))
}

Expand Down Expand Up @@ -359,7 +359,7 @@ where
impl InnerStore for DBStorage {
fn get(&self, prefix_name: &str, key: Vec<u8>) -> Result<Option<Vec<u8>>> {
record_metrics("db", prefix_name, "get", self.metrics.as_ref()).call(|| {
let cf_handle = self.get_cf_handle(prefix_name)?;
let cf_handle = self.get_cf_handle(prefix_name);
let result = self.db.get_cf(cf_handle, key.as_slice())?;
Ok(result)
})
Expand All @@ -374,7 +374,7 @@ impl InnerStore for DBStorage {
}

record_metrics("db", prefix_name, "put", self.metrics.as_ref()).call(|| {
let cf_handle = self.get_cf_handle(prefix_name)?;
let cf_handle = self.get_cf_handle(prefix_name);
self.db
.put_cf_opt(cf_handle, &key, &value, &Self::default_write_options())?;
Ok(())
Expand All @@ -391,7 +391,7 @@ impl InnerStore for DBStorage {
}
fn remove(&self, prefix_name: &str, key: Vec<u8>) -> Result<()> {
record_metrics("db", prefix_name, "remove", self.metrics.as_ref()).call(|| {
let cf_handle = self.get_cf_handle(prefix_name)?;
let cf_handle = self.get_cf_handle(prefix_name);
self.db.delete_cf(cf_handle, &key)?;
Ok(())
})
Expand All @@ -401,7 +401,7 @@ impl InnerStore for DBStorage {
fn write_batch(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> {
record_metrics("db", prefix_name, "write_batch", self.metrics.as_ref()).call(|| {
let mut db_batch = DBWriteBatch::default();
let cf_handle = self.get_cf_handle(prefix_name)?;
let cf_handle = self.get_cf_handle(prefix_name);
for (key, write_op) in &batch.rows {
match write_op {
WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value),
Expand Down Expand Up @@ -431,7 +431,7 @@ impl InnerStore for DBStorage {
}

record_metrics("db", prefix_name, "put_sync", self.metrics.as_ref()).call(|| {
let cf_handle = self.get_cf_handle(prefix_name)?;
let cf_handle = self.get_cf_handle(prefix_name);
self.db
.put_cf_opt(cf_handle, &key, &value, &Self::sync_write_options())?;
Ok(())
Expand All @@ -441,7 +441,7 @@ impl InnerStore for DBStorage {
fn write_batch_sync(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> {
record_metrics("db", prefix_name, "write_batch_sync", self.metrics.as_ref()).call(|| {
let mut db_batch = DBWriteBatch::default();
let cf_handle = self.get_cf_handle(prefix_name)?;
let cf_handle = self.get_cf_handle(prefix_name);
for (key, write_op) in &batch.rows {
match write_op {
WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value),
Expand All @@ -455,7 +455,7 @@ impl InnerStore for DBStorage {

fn multi_get(&self, prefix_name: &str, keys: Vec<Vec<u8>>) -> Result<Vec<Option<Vec<u8>>>> {
record_metrics("db", prefix_name, "multi_get", self.metrics.as_ref()).call(|| {
let cf_handle = self.get_cf_handle(prefix_name)?;
let cf_handle = self.get_cf_handle(prefix_name);
let cf_handles = iter::repeat(&cf_handle)
.take(keys.len())
.collect::<Vec<_>>();
Expand All @@ -482,7 +482,7 @@ impl RawDBStorage for DBStorage {
prefix: &str,
key: K,
) -> Result<Option<DBPinnableSlice>> {
let cf = self.get_cf_handle(prefix)?;
let cf = self.get_cf_handle(prefix);
let res = self
.db
.get_pinned_cf_opt(cf, key, &ReadOptions::default())?;
Expand Down

0 comments on commit 00a37d8

Please sign in to comment.