From 00a37d89d52fcaa10727bfd56417fbf04c698883 Mon Sep 17 00:00:00 2001 From: simonjiao Date: Wed, 7 Aug 2024 22:11:56 +0800 Subject: [PATCH] refactor BatchDbWriter to fix bugs (#4176) * refactor BatchDbWriter to fix bugs 1. fix a typo 2. remove unused codes * speed up flexidag test performance 1. increase blockdag's default cache size to 1024 for test cases * make get_cf_handle's return value simpler --- flexidag/src/blockdag.rs | 17 +++++------- .../src/consensusdb/consensus_ghostdag.rs | 4 +-- flexidag/src/consensusdb/consensus_header.rs | 4 +-- .../src/consensusdb/consensus_reachability.rs | 9 ++++--- .../src/consensusdb/consensus_relations.rs | 6 ++--- flexidag/src/consensusdb/db.rs | 7 ----- flexidag/src/consensusdb/writer.rs | 11 +++++--- flexidag/tests/tests.rs | 2 +- storage/src/db_storage/mod.rs | 26 +++++++++---------- 9 files changed, 39 insertions(+), 47 deletions(-) diff --git a/flexidag/src/blockdag.rs b/flexidag/src/blockdag.rs index 6bc5b9f7c7..0f8f4c5dc7 100644 --- a/flexidag/src/blockdag.rs +++ b/flexidag/src/blockdag.rs @@ -13,7 +13,7 @@ use crate::consensusdb::{ use crate::ghostdag::protocol::GhostdagManager; use crate::{process_key_already_error, reachability}; use anyhow::{bail, Ok}; -use starcoin_config::{temp_dir, RocksdbConfig}; +use starcoin_config::temp_dir; use starcoin_crypto::{HashValue as Hash, HashValue}; use starcoin_logger::prelude::{debug, info}; use starcoin_types::block::BlockHeader; @@ -22,7 +22,6 @@ use starcoin_types::{ consensus_header::ConsensusHeader, }; use std::ops::DerefMut; -use std::path::Path; use std::sync::Arc; pub const DEFAULT_GHOSTDAG_K: KType = 8u16; @@ -61,18 +60,14 @@ impl BlockDAG { } } pub fn create_for_testing() -> anyhow::Result { - let dag_storage = - FlexiDagStorage::create_from_path(temp_dir(), FlexiDagStorageConfig::default())?; + let config = FlexiDagStorageConfig { + cache_size: 1024, + ..Default::default() + }; + let dag_storage = FlexiDagStorage::create_from_path(temp_dir(), config)?; Ok(Self::new(DEFAULT_GHOSTDAG_K, dag_storage)) } - pub fn new_by_config(db_path: &Path) -> anyhow::Result { - let config = FlexiDagStorageConfig::create_with_params(1, RocksdbConfig::default()); - let db = FlexiDagStorage::create_from_path(db_path, config)?; - let dag = Self::new(DEFAULT_GHOSTDAG_K, db); - Ok(dag) - } - pub fn has_dag_block(&self, hash: Hash) -> anyhow::Result { Ok(self.storage.header_store.has(hash)?) } diff --git a/flexidag/src/consensusdb/consensus_ghostdag.rs b/flexidag/src/consensusdb/consensus_ghostdag.rs index d0cc97585a..a45d9473cd 100644 --- a/flexidag/src/consensusdb/consensus_ghostdag.rs +++ b/flexidag/src/consensusdb/consensus_ghostdag.rs @@ -231,9 +231,9 @@ impl DbGhostdagStore { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } self.access - .write(BatchDbWriter::new(batch), hash, data.clone())?; + .write(BatchDbWriter::new(batch, &self.db), hash, data.clone())?; self.compact_access.write( - BatchDbWriter::new(batch), + BatchDbWriter::new(batch, &self.db), hash, CompactGhostdagData { blue_score: data.blue_score, diff --git a/flexidag/src/consensusdb/consensus_header.rs b/flexidag/src/consensusdb/consensus_header.rs index 34b275a9fb..131616d490 100644 --- a/flexidag/src/consensusdb/consensus_header.rs +++ b/flexidag/src/consensusdb/consensus_header.rs @@ -125,7 +125,7 @@ impl DbHeadersStore { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } self.headers_access.write( - BatchDbWriter::new(batch), + BatchDbWriter::new(batch, &self.db), hash, HeaderWithBlockLevel { header: header.clone(), @@ -133,7 +133,7 @@ impl DbHeadersStore { }, )?; self.compact_headers_access.write( - BatchDbWriter::new(batch), + BatchDbWriter::new(batch, &self.db), hash, CompactHeaderData { timestamp: header.timestamp(), diff --git a/flexidag/src/consensusdb/consensus_reachability.rs b/flexidag/src/consensusdb/consensus_reachability.rs index 2db902ad72..282a000abf 100644 --- a/flexidag/src/consensusdb/consensus_reachability.rs +++ b/flexidag/src/consensusdb/consensus_reachability.rs @@ -139,9 +139,9 @@ impl ReachabilityStore for DbReachabilityStore { )); let mut batch = WriteBatch::default(); self.access - .write(BatchDbWriter::new(&mut batch), origin, data)?; + .write(BatchDbWriter::new(&mut batch, &self.db), origin, data)?; self.reindex_root - .write(BatchDbWriter::new(&mut batch), &origin)?; + .write(BatchDbWriter::new(&mut batch, &self.db), &origin)?; self.db .raw_write_batch(batch) .map_err(|e| StoreError::DBIoError(e.to_string()))?; @@ -255,17 +255,18 @@ impl<'a> StagingReachabilityStore<'a> { self, batch: &mut WriteBatch, ) -> Result, StoreError> { + let db = Arc::clone(&self.store_read.db); let mut store_write = RwLockUpgradableReadGuard::upgrade(self.store_read); for (k, v) in self.staging_writes { let data = Arc::new(v); store_write .access - .write(BatchDbWriter::new(batch), k, data)? + .write(BatchDbWriter::new(batch, &db), k, data)? } if let Some(root) = self.staging_reindex_root { store_write .reindex_root - .write(BatchDbWriter::new(batch), &root)?; + .write(BatchDbWriter::new(batch, &db), &root)?; } Ok(store_write) } diff --git a/flexidag/src/consensusdb/consensus_relations.rs b/flexidag/src/consensusdb/consensus_relations.rs index e9317bf7a4..036f628bfb 100644 --- a/flexidag/src/consensusdb/consensus_relations.rs +++ b/flexidag/src/consensusdb/consensus_relations.rs @@ -103,11 +103,11 @@ impl DbRelationsStore { // Insert a new entry for `hash` self.parents_access - .write(BatchDbWriter::new(batch), hash, parents.clone())?; + .write(BatchDbWriter::new(batch, &self.db), hash, parents.clone())?; // The new hash has no children yet self.children_access.write( - BatchDbWriter::new(batch), + BatchDbWriter::new(batch, &self.db), hash, BlockHashes::new(Vec::new()), )?; @@ -117,7 +117,7 @@ impl DbRelationsStore { let mut children = (*self.get_children(parent)?).clone(); children.push(hash); self.children_access.write( - BatchDbWriter::new(batch), + BatchDbWriter::new(batch, &self.db), parent, BlockHashes::new(children), )?; diff --git a/flexidag/src/consensusdb/db.rs b/flexidag/src/consensusdb/db.rs index ccef6d8d21..0590c99706 100644 --- a/flexidag/src/consensusdb/db.rs +++ b/flexidag/src/consensusdb/db.rs @@ -38,13 +38,6 @@ impl FlexiDagStorageConfig { pub fn new() -> Self { Self::default() } - - pub fn create_with_params(cache_size: usize, rocksdb_config: RocksdbConfig) -> Self { - Self { - cache_size, - rocksdb_config, - } - } } impl From for FlexiDagStorageConfig { diff --git a/flexidag/src/consensusdb/writer.rs b/flexidag/src/consensusdb/writer.rs index 717d7d7e1c..1d90459d8e 100644 --- a/flexidag/src/consensusdb/writer.rs +++ b/flexidag/src/consensusdb/writer.rs @@ -39,11 +39,12 @@ impl DbWriter for DirectDbWriter<'_> { pub struct BatchDbWriter<'a> { batch: &'a mut WriteBatch, + db: &'a DBStorage, } impl<'a> BatchDbWriter<'a> { - pub fn new(batch: &'a mut WriteBatch) -> Self { - Self { batch } + pub fn new(batch: &'a mut WriteBatch, db: &'a DBStorage) -> Self { + Self { batch, db } } } @@ -51,13 +52,15 @@ impl DbWriter for BatchDbWriter<'_> { fn put(&mut self, key: &S::Key, value: &S::Value) -> Result<(), StoreError> { let key = key.encode_key()?; let value = value.encode_value()?; - self.batch.put(key, value); + let cf_handle = self.db.get_cf_handle(S::COLUMN_FAMILY); + self.batch.put_cf(cf_handle, key, value); Ok(()) } fn delete(&mut self, key: &S::Key) -> Result<(), StoreError> { let key = key.encode_key()?; - self.batch.delete(key); + let cf_handle = self.db.get_cf_handle(S::COLUMN_FAMILY); + self.batch.delete_cf(cf_handle, key); Ok(()) } } diff --git a/flexidag/tests/tests.rs b/flexidag/tests/tests.rs index ebf40b932c..744aed536a 100644 --- a/flexidag/tests/tests.rs +++ b/flexidag/tests/tests.rs @@ -617,7 +617,7 @@ fn test_reachability_not_ancestor() -> anyhow::Result<()> { } #[test] -fn test_reachability_algorighm() -> anyhow::Result<()> { +fn test_reachability_algorithm() -> anyhow::Result<()> { let dag = BlockDAG::create_for_testing().unwrap(); let reachability_store = dag.storage.reachability_store.clone(); diff --git a/storage/src/db_storage/mod.rs b/storage/src/db_storage/mod.rs index 1570d259d1..1bc73a930b 100644 --- a/storage/src/db_storage/mod.rs +++ b/storage/src/db_storage/mod.rs @@ -174,7 +174,7 @@ impl DBStorage { /// tests. pub fn flush_all(&self) -> Result<()> { for cf_name in &self.cfs { - let cf_handle = self.get_cf_handle(cf_name)?; + let cf_handle = self.get_cf_handle(cf_name); self.db.flush_cf(cf_handle)?; } Ok(()) @@ -190,8 +190,8 @@ impl DBStorage { rocksdb_current_file.is_file() } - fn get_cf_handle(&self, cf_name: &str) -> Result<&rocksdb::ColumnFamily> { - self.db.cf_handle(cf_name).ok_or_else(|| { + pub fn get_cf_handle(&self, cf_name: &str) -> &rocksdb::ColumnFamily { + self.db.cf_handle(cf_name).unwrap_or_else(|| { panic!( "DB::cf_handle not found for column family name: {}", cf_name @@ -232,7 +232,7 @@ impl DBStorage { K: KeyCodec, V: ValueCodec, { - let cf_handle = self.get_cf_handle(prefix_name)?; + let cf_handle = self.get_cf_handle(prefix_name); Ok(SchemaIterator::new( self.db .raw_iterator_cf_opt(cf_handle, ReadOptions::default()), @@ -246,7 +246,7 @@ impl DBStorage { mode: IteratorMode, readopts: ReadOptions, ) -> Result { - let cf_handle = self.get_cf_handle(prefix_name)?; + let cf_handle = self.get_cf_handle(prefix_name); Ok(self.db.iterator_cf_opt(cf_handle, readopts, mode)) } @@ -359,7 +359,7 @@ where impl InnerStore for DBStorage { fn get(&self, prefix_name: &str, key: Vec) -> Result>> { record_metrics("db", prefix_name, "get", self.metrics.as_ref()).call(|| { - let cf_handle = self.get_cf_handle(prefix_name)?; + let cf_handle = self.get_cf_handle(prefix_name); let result = self.db.get_cf(cf_handle, key.as_slice())?; Ok(result) }) @@ -374,7 +374,7 @@ impl InnerStore for DBStorage { } record_metrics("db", prefix_name, "put", self.metrics.as_ref()).call(|| { - let cf_handle = self.get_cf_handle(prefix_name)?; + let cf_handle = self.get_cf_handle(prefix_name); self.db .put_cf_opt(cf_handle, &key, &value, &Self::default_write_options())?; Ok(()) @@ -391,7 +391,7 @@ impl InnerStore for DBStorage { } fn remove(&self, prefix_name: &str, key: Vec) -> Result<()> { record_metrics("db", prefix_name, "remove", self.metrics.as_ref()).call(|| { - let cf_handle = self.get_cf_handle(prefix_name)?; + let cf_handle = self.get_cf_handle(prefix_name); self.db.delete_cf(cf_handle, &key)?; Ok(()) }) @@ -401,7 +401,7 @@ impl InnerStore for DBStorage { fn write_batch(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> { record_metrics("db", prefix_name, "write_batch", self.metrics.as_ref()).call(|| { let mut db_batch = DBWriteBatch::default(); - let cf_handle = self.get_cf_handle(prefix_name)?; + let cf_handle = self.get_cf_handle(prefix_name); for (key, write_op) in &batch.rows { match write_op { WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value), @@ -431,7 +431,7 @@ impl InnerStore for DBStorage { } record_metrics("db", prefix_name, "put_sync", self.metrics.as_ref()).call(|| { - let cf_handle = self.get_cf_handle(prefix_name)?; + let cf_handle = self.get_cf_handle(prefix_name); self.db .put_cf_opt(cf_handle, &key, &value, &Self::sync_write_options())?; Ok(()) @@ -441,7 +441,7 @@ impl InnerStore for DBStorage { fn write_batch_sync(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> { record_metrics("db", prefix_name, "write_batch_sync", self.metrics.as_ref()).call(|| { let mut db_batch = DBWriteBatch::default(); - let cf_handle = self.get_cf_handle(prefix_name)?; + let cf_handle = self.get_cf_handle(prefix_name); for (key, write_op) in &batch.rows { match write_op { WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value), @@ -455,7 +455,7 @@ impl InnerStore for DBStorage { fn multi_get(&self, prefix_name: &str, keys: Vec>) -> Result>>> { record_metrics("db", prefix_name, "multi_get", self.metrics.as_ref()).call(|| { - let cf_handle = self.get_cf_handle(prefix_name)?; + let cf_handle = self.get_cf_handle(prefix_name); let cf_handles = iter::repeat(&cf_handle) .take(keys.len()) .collect::>(); @@ -482,7 +482,7 @@ impl RawDBStorage for DBStorage { prefix: &str, key: K, ) -> Result> { - let cf = self.get_cf_handle(prefix)?; + let cf = self.get_cf_handle(prefix); let res = self .db .get_pinned_cf_opt(cf, key, &ReadOptions::default())?;