diff --git a/account/src/account_schemadb/mod.rs b/account/src/account_schemadb/mod.rs index edaa889b92..f5fc5cd9f3 100644 --- a/account/src/account_schemadb/mod.rs +++ b/account/src/account_schemadb/mod.rs @@ -15,7 +15,7 @@ pub(crate) use global_setting::*; pub(crate) use private_key::*; pub(crate) use public_key::*; pub(crate) use setting::*; -use starcoin_schemadb::SchemaBatch; +use starcoin_schemadb::{db::DBStorage as DB, SchemaBatch}; #[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Hash, Clone, Copy)] pub(crate) struct AccountAddressWrapper(AccountAddress); @@ -43,27 +43,51 @@ impl TryFrom<&[u8]> for AccountAddressWrapper { #[derive(Clone)] pub(super) struct AccountStore { cache: Arc>, + db: Option>, } impl AccountStore { + // create an memory-based store pub fn new() -> Self { Self { cache: Arc::new(GCacheStorage::::new(None)), + db: None, + } + } + pub fn new_with_db(db: &Arc) -> Self { + Self { + cache: Arc::new(GCacheStorage::::new(None)), + db: Some(Arc::clone(db)), } } pub fn get(&self, key: &S::Key) -> Result> { - Ok(self.cache.get_inner(key)) + self.cache + .get_inner(key) + .map(|val| Ok(Some(val))) + .unwrap_or_else(|| { + self.db + .as_ref() + .map_or_else(|| Ok(None), |db| db.get::(key)) + }) } pub fn put(&self, key: S::Key, value: S::Value) -> Result<()> { - self.cache.put_inner(key, value); - Ok(()) + self.db + .as_ref() + .map_or_else(|| Ok(()), |db| db.put::(&key, &value)) + .map(|_| { + self.cache.put_inner(key, value); + }) } pub fn remove(&self, key: &S::Key) -> Result<()> { - self.cache.remove_inner(key); - Ok(()) + self.db + .as_ref() + .map_or_else(|| Ok(()), |db| db.remove::(key)) + .map(|_| { + self.cache.remove_inner(key); + }) } pub fn put_batch(&self, key: S::Key, value: S::Value, batch: &SchemaBatch) -> Result<()> { diff --git a/account/src/account_storage.rs b/account/src/account_storage.rs index 0db69b1e57..7b9d3a0e81 100644 --- a/account/src/account_storage.rs +++ b/account/src/account_storage.rs @@ -4,13 +4,12 @@ use crate::account_schemadb::{ AcceptedToken, AcceptedTokens, AccountAddressWrapper, AccountSetting, AccountStore, EncryptedPrivateKey, GlobalSetting, GlobalSettingKey, GlobalValue, PrivateKey, PublicKey, - PublicKeyWrapper, SettingWrapper, ACCEPTED_TOKEN_PREFIX_NAME, - ENCRYPTED_PRIVATE_KEY_PREFIX_NAME, GLOBAL_PREFIX_NAME, PUBLIC_KEY_PREFIX_NAME, - SETTING_PREFIX_NAME, + SettingWrapper, ACCEPTED_TOKEN_PREFIX_NAME, ENCRYPTED_PRIVATE_KEY_PREFIX_NAME, + GLOBAL_PREFIX_NAME, PUBLIC_KEY_PREFIX_NAME, SETTING_PREFIX_NAME, }; use anyhow::{Error, Result}; use starcoin_account_api::{AccountPrivateKey, AccountPublicKey, Setting}; -use starcoin_config::{temp_dir, RocksdbConfig}; +use starcoin_config::RocksdbConfig; use starcoin_crypto::ValidCryptoMaterial; use starcoin_decrypt::{decrypt, encrypt}; use starcoin_schemadb::{db::DBStorage as DB, SchemaBatch}; @@ -19,7 +18,7 @@ use std::{convert::TryFrom, path::Path, sync::Arc}; #[derive(Clone)] pub struct AccountStorage { - db: Arc, + db: Option>, setting_store: AccountStore, private_key_store: AccountStore, public_key_store: AccountStore, @@ -48,7 +47,18 @@ impl AccountStorage { pub fn new(db: Arc) -> Self { Self { - db: Arc::clone(&db), + db: Some(Arc::clone(&db)), + setting_store: AccountStore::::new_with_db(&db), + private_key_store: AccountStore::::new_with_db(&db), + public_key_store: AccountStore::::new_with_db(&db), + accepted_token_store: AccountStore::::new_with_db(&db), + global_value_store: AccountStore::::new_with_db(&db), + } + } + + pub fn mock() -> Self { + Self { + db: None, setting_store: AccountStore::::new(), private_key_store: AccountStore::::new(), public_key_store: AccountStore::::new(), @@ -56,26 +66,6 @@ impl AccountStorage { global_value_store: AccountStore::::new(), } } - - pub fn mock() -> Self { - let path = temp_dir(); - let db = DB::open_with_cfs( - "acccountmock", - &path, - vec![ - SETTING_PREFIX_NAME, - ENCRYPTED_PRIVATE_KEY_PREFIX_NAME, - PUBLIC_KEY_PREFIX_NAME, - ACCEPTED_TOKEN_PREFIX_NAME, - GLOBAL_PREFIX_NAME, - ], - false, - RocksdbConfig::default(), - None, - ) - .unwrap(); - Self::new(Arc::new(db)) - } } impl AccountStorage { @@ -113,19 +103,19 @@ impl AccountStorage { .get(global_setting_key)? .map(|v| Ok(Some(v))) .unwrap_or_else(|| { - self.db - .get::(&GlobalSettingKey::AllAddresses) + if global_setting_key != &GlobalSettingKey::AllAddresses { + self.global_value_store.get(&GlobalSettingKey::AllAddresses) + } else { + Ok(None) + } }) } fn put_addresses(&self, key: GlobalSettingKey, value: GlobalValue) -> Result<()> { - self.db - .put::(&key, &value) - .and_then(|_| self.global_value_store.put(key, value)) + self.global_value_store.put(key, value) } fn remove_address(&self, key: &GlobalSettingKey) -> Result<()> { - self.db.remove::(key)?; self.global_value_store.remove(key) } @@ -159,19 +149,11 @@ impl AccountStorage { } fn get_public_key(&self, address: &AccountAddressWrapper) -> Result> { - self.public_key_store - .get(address)? - .map(|v| Ok(Some(v))) - .unwrap_or_else(|| self.db.get::(address)) - .map(|v| v.map(Into::into)) + Ok(self.public_key_store.get(address)?.map(Into::into)) } fn put_public_key(&self, key: AccountAddress, value: AccountPublicKey) -> Result<()> { - let key: AccountAddressWrapper = key.into(); - let value: PublicKeyWrapper = value.into(); - self.db - .put::(&key, &value) - .and_then(|_| self.public_key_store.put(key, value)) + self.public_key_store.put(key.into(), value.into()) } pub fn public_key(&self, address: AccountAddress) -> Result> { @@ -182,10 +164,7 @@ impl AccountStorage { &self, address: &AccountAddressWrapper, ) -> Result> { - self.private_key_store - .get(address)? - .map(|v| Ok(Some(v))) - .unwrap_or_else(|| self.db.get::(address)) + self.private_key_store.get(address) } //fn put_private_key(&self, key: AccountAddress, value: EncryptedPrivateKey) -> Result<()> { @@ -239,9 +218,7 @@ impl AccountStorage { fn put_setting(&self, address: AccountAddress, setting: Setting) -> Result<()> { let key: AccountAddressWrapper = address.into(); let value: SettingWrapper = setting.into(); - self.db - .put::(&key, &value) - .and_then(|_| self.setting_store.put(key, value)) + self.setting_store.put(key, value) } pub fn update_setting(&self, address: AccountAddress, setting: Setting) -> Result<()> { @@ -250,13 +227,7 @@ impl AccountStorage { pub fn load_setting(&self, address: AccountAddress) -> Result { let key: AccountAddressWrapper = address.into(); - Ok(self - .setting_store - .get(&key)? - .map(|setting| Ok(Some(setting))) - .unwrap_or_else(|| self.db.get::(&key))? - .unwrap_or_default() - .0) + Ok(self.setting_store.get(&key)?.unwrap_or_default().0) } pub fn destroy_account(&self, address: AccountAddress) -> Result<()> { @@ -291,25 +262,21 @@ impl AccountStorage { self.accepted_token_store.remove_batch(&key, &batch)?; // persist updates to underlying storage - self.db.write_schemas(batch)?; + self.db + .as_ref() + .map_or_else(|| Ok(()), |db| db.write_schemas(batch))?; Ok(()) } pub fn get_accepted_tokens(&self, address: AccountAddress) -> Result> { let key: AccountAddressWrapper = address.into(); - let ts = self - .accepted_token_store - .get(&key)? - .map(|v| Ok(Some(v))) - .unwrap_or_else(|| self.db.get::(&key))?; + let ts = self.accepted_token_store.get(&key)?; Ok(ts.map(|t| t.0).unwrap_or_default()) } fn put_accepted_tokens(&self, key: AccountAddressWrapper, value: AcceptedTokens) -> Result<()> { - self.db - .put::(&key, &value) - .and_then(|_| self.accepted_token_store.put(key, value)) + self.accepted_token_store.put(key, value) } pub fn add_accepted_token( @@ -326,6 +293,8 @@ impl AccountStorage { } pub fn write_schemas(&self, batch: SchemaBatch) -> Result<()> { - self.db.write_schemas(batch) + self.db + .as_ref() + .map_or_else(|| Ok(()), |db| db.write_schemas(batch)) } } diff --git a/storage/schemadb/src/db/mod.rs b/storage/schemadb/src/db/mod.rs index b539ad5115..386b100fdd 100644 --- a/storage/schemadb/src/db/mod.rs +++ b/storage/schemadb/src/db/mod.rs @@ -24,13 +24,19 @@ const RES_FDS: u64 = 4096; #[allow(clippy::upper_case_acronyms)] pub struct DBStorage { name: String, // for logging - // Todo, make me private to other crates - pub db: DB, + db: DB, cfs: Vec, - // Todo, make me private to other crates - pub metrics: Option, + metrics: Option, } impl DBStorage { + pub fn db(&self) -> &DB { + &self.db + } + + pub fn metrics(&self) -> Option<&StorageMetrics> { + self.metrics.as_ref() + } + pub fn new + Clone>( db_root_path: P, rocksdb_config: RocksdbConfig, @@ -312,6 +318,7 @@ impl DBStorage { } } +// The new Apis impl DBStorage { pub fn open( name: &str, @@ -383,7 +390,7 @@ impl DBStorage { let raw_key = >::encode_key(key)?; let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY)?; self.db - .get_cf(cf_handle, raw_key) + .get_pinned_cf(cf_handle, raw_key) .map_err(Into::into) .and_then(|raw_value| { raw_value diff --git a/storage/src/db_storage/mod.rs b/storage/src/db_storage/mod.rs index 79a0fcc5dc..3e3cb2cb65 100644 --- a/storage/src/db_storage/mod.rs +++ b/storage/src/db_storage/mod.rs @@ -42,7 +42,7 @@ impl ClassicIter for DBStorage { { let cf_handle = self.get_cf_handle(prefix_name)?; Ok(SchemaIterator::new( - self.db + self.db() .raw_iterator_cf_opt(cf_handle, ReadOptions::default()), direction, )) @@ -68,48 +68,48 @@ impl ClassicIter for DBStorage { impl InnerStore for DBStorage { fn get_raw(&self, prefix_name: &str, key: Vec) -> Result>> { - record_metrics("db", prefix_name, "get", self.metrics.as_ref()).call(|| { + record_metrics("db", prefix_name, "get", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; - let result = self.db.get_cf(cf_handle, key.as_slice())?; + let result = self.db().get_cf(cf_handle, key.as_slice())?; Ok(result) }) } fn put_raw(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { - if let Some(metrics) = self.metrics.as_ref() { + if let Some(metrics) = self.metrics() { metrics .storage_item_bytes .with_label_values(&[prefix_name]) .observe((key.len() + value.len()) as f64); } - record_metrics("db", prefix_name, "put", self.metrics.as_ref()).call(|| { + record_metrics("db", prefix_name, "put", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; - self.db + self.db() .put_cf_opt(cf_handle, &key, &value, &Self::default_write_options())?; Ok(()) }) } fn contains_key(&self, prefix_name: &str, key: Vec) -> Result { - record_metrics("db", prefix_name, "contains_key", self.metrics.as_ref()).call(|| match self - .get_raw(prefix_name, key) - { - Ok(Some(_)) => Ok(true), - _ => Ok(false), + record_metrics("db", prefix_name, "contains_key", self.metrics()).call(|| { + match self.get_raw(prefix_name, key) { + Ok(Some(_)) => Ok(true), + _ => Ok(false), + } }) } fn remove_raw(&self, prefix_name: &str, key: Vec) -> Result<()> { - record_metrics("db", prefix_name, "remove", self.metrics.as_ref()).call(|| { + record_metrics("db", prefix_name, "remove", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; - self.db.delete_cf(cf_handle, &key)?; + self.db().delete_cf(cf_handle, &key)?; Ok(()) }) } /// Writes a group of records wrapped in a WriteBatch. fn write_batch(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> { - record_metrics("db", prefix_name, "write_batch", self.metrics.as_ref()).call(|| { + record_metrics("db", prefix_name, "write_batch", self.metrics()).call(|| { self.write_batch_inner( prefix_name, batch.rows.as_slice(), @@ -127,28 +127,28 @@ impl InnerStore for DBStorage { } fn put_sync(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()> { - if let Some(metrics) = self.metrics.as_ref() { + if let Some(metrics) = self.metrics() { metrics .storage_item_bytes .with_label_values(&[prefix_name]) .observe((key.len() + value.len()) as f64); } - record_metrics("db", prefix_name, "put_sync", self.metrics.as_ref()).call(|| { + record_metrics("db", prefix_name, "put_sync", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; - self.db + self.db() .put_cf_opt(cf_handle, &key, &value, &Self::sync_write_options())?; Ok(()) }) } fn write_batch_sync(&self, prefix_name: &str, batch: WriteBatch) -> Result<()> { - record_metrics("db", prefix_name, "write_batch_sync", self.metrics.as_ref()) + record_metrics("db", prefix_name, "write_batch_sync", self.metrics()) .call(|| self.write_batch_inner(prefix_name, batch.rows.as_slice(), true)) } fn multi_get(&self, prefix_name: &str, keys: Vec>) -> Result>>> { - record_metrics("db", prefix_name, "multi_get", self.metrics.as_ref()).call(|| { + record_metrics("db", prefix_name, "multi_get", self.metrics()).call(|| { let cf_handle = self.get_cf_handle(prefix_name)?; let cf_handles = iter::repeat(&cf_handle) .take(keys.len()) @@ -159,7 +159,7 @@ impl InnerStore for DBStorage { .map(|(key, handle)| (handle, key.as_slice())) .collect::>(); - let result = self.db.multi_get_cf(keys_multi); + let result = self.db().multi_get_cf(keys_multi); let mut res = vec![]; for item in result { let item = item?; diff --git a/storage/src/storage.rs b/storage/src/storage.rs index 9b929faad8..a8e823d090 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - cache_storage::CacheStorage, + cache_storage::GCacheStorage, db_storage::{ClassicIter, SchemaIterator}, upgrade::DBUpgrade, }; @@ -12,7 +12,7 @@ use starcoin_config::NodeConfig; use starcoin_crypto::HashValue; use starcoin_logger::prelude::info; use starcoin_vm_types::state_store::table::TableHandle; -use std::{convert::TryInto, fmt::Debug, marker::PhantomData, sync::Arc}; +use std::{convert::TryInto, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; pub use { crate::batch::WriteBatch, starcoin_schemadb::{db::DBStorage, ColumnFamilyName, GWriteOp as WriteOp}, @@ -45,53 +45,59 @@ pub trait InnerStore: Send + Sync { fn multi_get(&self, prefix_name: &str, keys: Vec>) -> Result>>>; } -///Storage instance type define +pub type StorageInstance = GStorageInstance, Vec>; + +///Generic Storage instance type define #[derive(Clone)] #[allow(clippy::upper_case_acronyms)] -pub enum StorageInstance { +pub enum GStorageInstance +where + K: Hash + Eq + Default, + V: Default, +{ CACHE { - cache: Arc, + cache: Arc>, }, DB { db: Arc, }, CacheAndDb { - cache: Arc, + cache: Arc>, db: Arc, }, } -impl StorageInstance { +impl GStorageInstance +where + K: Hash + Eq + Default, + V: Default, +{ pub fn new_cache_instance() -> Self { - StorageInstance::CACHE { - cache: Arc::new(CacheStorage::new(None)), + GStorageInstance::CACHE { + cache: Arc::new(GCacheStorage::default()), } } pub fn new_db_instance(db: DBStorage) -> Self { Self::DB { db: Arc::new(db) } } - pub fn new_cache_and_db_instance(cache: CacheStorage, db: DBStorage) -> Self { + pub fn new_cache_and_db_instance(cache: GCacheStorage, db: DBStorage) -> Self { Self::CacheAndDb { cache: Arc::new(cache), db: Arc::new(db), } } - pub fn cache(&self) -> Option> { + pub fn cache(&self) -> Option>> { match self { - StorageInstance::CACHE { cache } | StorageInstance::CacheAndDb { cache, db: _ } => { - Some(cache.clone()) - } + Self::CACHE { cache } | Self::CacheAndDb { cache, db: _ } => Some(cache.clone()), _ => None, } } - pub fn db(&self) -> Option<&DBStorage> { + pub fn db(&self) -> Option<&Arc> { match self { - StorageInstance::DB { db } | StorageInstance::CacheAndDb { cache: _, db } => { - Some(db.as_ref()) - } + Self::DB { db } | Self::CacheAndDb { cache: _, db } => Some(db), _ => None, } } @@ -99,13 +105,13 @@ impl StorageInstance { // make sure Arc::strong_count(&db) == 1 unless will get None pub fn db_mut(&mut self) -> Option<&mut DBStorage> { match self { - StorageInstance::DB { db } | StorageInstance::CacheAndDb { cache: _, db } => { - Arc::get_mut(db) - } + Self::DB { db } | Self::CacheAndDb { cache: _, db } => Arc::get_mut(db), _ => None, } } +} +impl StorageInstance { pub fn check_upgrade(&mut self) -> Result<()> { DBUpgrade::check_upgrade(self) }