diff --git a/Cargo.lock b/Cargo.lock index 5a5e3ac3f..1c5cc5d3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,6 +1859,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "tempfile", "tikv-jemalloc-ctl", "tikv-jemallocator", "toml 0.7.3", diff --git a/core/run/Cargo.toml b/core/run/Cargo.toml index 15be1f1d4..8513bfe82 100644 --- a/core/run/Cargo.toml +++ b/core/run/Cargo.toml @@ -12,6 +12,7 @@ rlp = "0.5" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +tempfile = "3.6" toml = "0.7" common-apm = { path = "../../common/apm" } diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index 9cd2ca168..329a28793 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -1,6 +1,6 @@ #![allow(clippy::uninlined_format_args, clippy::mutable_key_type)] -use std::{collections::HashMap, panic::PanicInfo, sync::Arc, time::Duration}; +use std::{collections::HashMap, panic::PanicInfo, path::PathBuf, sync::Arc, time::Duration}; use backtrace::Backtrace; #[cfg(all( @@ -15,7 +15,9 @@ use { use common_apm::metrics::mempool::{MEMPOOL_CO_QUEUE_LEN, MEMPOOL_LEN_GAUGE}; use common_apm::{server::run_prometheus_server, tracing::global_tracer_register}; -use common_config_parser::types::{Config, ConfigJaeger, ConfigPrometheus}; +use common_config_parser::types::{ + Config, ConfigJaeger, ConfigPrometheus, ConfigRocksDB, InitialAccount, +}; use common_crypto::{ BlsPrivateKey, BlsPublicKey, PublicKey, Secp256k1, Secp256k1PrivateKey, Secp256k1PublicKey, ToPublicKey, UncompressedPublicKey, @@ -33,8 +35,8 @@ use protocol::traits::{ Rpc, Storage, SynchronizationAdapter, }; use protocol::types::{ - Account, Address, Block, MerkleRoot, Metadata, Proposal, RichBlock, SignedTransaction, - Validator, ValidatorExtend, H256, NIL_DATA, RLP_NULL, + Account, Address, Block, ExecResp, MerkleRoot, Metadata, Proposal, RichBlock, + SignedTransaction, Validator, ValidatorExtend, H256, NIL_DATA, RLP_NULL, }; use protocol::{ async_trait, trie::DB as TrieDB, Display, From, ProtocolError, ProtocolErrorKind, @@ -107,7 +109,15 @@ impl Axon { .expect("new tokio runtime"); rt.block_on(async move { - self.create_genesis().await?; + // TODO Introduce a read-only rocksdb handler for checks. + let storage = self.init_storage(false).await?; + if let Some(genesis) = self.try_load_genesis(&storage).await? { + log::info!("The Genesis block has been initialized."); + self.apply_genesis_after_checks(&genesis).await?; + } else { + self.create_genesis(&storage).await?; + } + drop(storage); self.start(key_provider).await })?; rt.shutdown_timeout(std::time::Duration::from_secs(1)); @@ -115,117 +125,130 @@ impl Axon { Ok(()) } - pub async fn create_genesis(&mut self) -> ProtocolResult<()> { - // Init Block db - let path_block = self.config.data_path_for_block(); - let rocks_adapter = Arc::new(RocksAdapter::new(path_block, self.config.rocksdb.clone())?); - let storage = Arc::new(ImplStorage::new( - rocks_adapter, - self.config.rocksdb.cache_size, - )); - - match storage.get_latest_block(Context::new()).await { - Ok(_) => { - log::info!("The Genesis block has been initialized."); - return Ok(()); - } - Err(e) => { - if !e.to_string().contains("GetNone") { - return Err(e); - } + async fn try_load_genesis( + &self, + storage: &Arc>, + ) -> ProtocolResult> { + storage.get_block(Context::new(), 0).await.or_else(|e| { + if e.to_string().contains("GetNone") { + Ok(None) + } else { + Err(e) } - }; + }) + } - // Init trie db - let path_state = self.config.data_path_for_state(); - let trie_db = Arc::new(RocksTrieDB::new( - path_state, - self.config.rocksdb.clone(), - self.config.executor.triedb_cache_size, - )?); + async fn create_genesis( + &mut self, + storage: &Arc>, + ) -> ProtocolResult<()> { + let trie_db = self.init_trie_db(false).await?; let mut mpt = MPTTrie::new(Arc::clone(&trie_db)); - self.insert_accounts(&mut mpt).await?; - self.execute_transactions(&mut mpt, &trie_db, &storage) - .await?; + insert_accounts(&mut mpt, &self.config.accounts).await?; - log::info!("The genesis block is created {:?}", self.genesis.block); + let path_metadata = self.config.data_path_for_system_contract(); + let resp = execute_transactions( + &self.genesis, + &mut mpt, + &trie_db, + storage, + path_metadata, + &self.config.rocksdb.clone(), + ) + .await?; - Ok(()) - } + log::info!( + "Execute the genesis distribute success, genesis state root {:?}, response {:?}", + resp.state_root, + resp + ); - async fn insert_accounts(&self, mpt: &mut MPTTrie) -> ProtocolResult<()> { - for account in self.config.accounts.iter() { - let raw_account = Account { - nonce: 0u64.into(), - balance: account.balance, - storage_root: RLP_NULL, - code_hash: NIL_DATA, - } - .encode()?; + self.state_root = resp.state_root; + self.apply_genesis_data(resp.state_root, resp.receipt_root)?; - mpt.insert(account.address.as_bytes(), &raw_account)?; - } + log::info!("The genesis block is created {:?}", self.genesis.block); + + save_block(storage, &self.genesis).await?; Ok(()) } - async fn execute_transactions( - &mut self, - mpt: &mut MPTTrie, - trie_db: &Arc, - storage: &Arc, - ) -> ProtocolResult<()> - where - S: Storage + 'static, - DB: TrieDB + 'static, - { - let executor = AxonExecutor::default(); - let mut backend = AxonExecutorAdapter::from_root( - mpt.commit()?, - Arc::clone(trie_db), - Arc::clone(storage), - Proposal::new_without_state_root(&self.genesis.block.header).into(), - )?; - - let path_metadata = self.config.data_path_for_system_contract(); - system_contract::init(path_metadata, self.config.rocksdb.clone(), &mut backend); + async fn apply_genesis_after_checks(&mut self, loaded_genesis: &Block) -> ProtocolResult<()> { + let tmp_dir = tempfile::tempdir().map_err(|err| { + let errmsg = format!("failed to create temporary directory since {err:?}"); + MainError::Other(errmsg) + })?; - let resp = executor.exec(&mut backend, &self.genesis.txs, &[]); + let storage = { + let path_block = tmp_dir.path().join("block"); + let rocks_adapter = + Arc::new(RocksAdapter::new(path_block, self.config.rocksdb.clone())?); + let impl_storage = ImplStorage::new(rocks_adapter, self.config.rocksdb.cache_size); + Arc::new(impl_storage) + }; - resp.tx_resp.iter().enumerate().for_each(|(i, r)| { - if !r.exit_reason.is_succeed() { - panic!( - "The {}th tx in genesis execute failed, reason {:?}", - i, r.exit_reason - ); - } - }); + let trie_db = { + let path_state = tmp_dir.path().join("state"); + let trie_db = RocksTrieDB::new( + path_state, + self.config.rocksdb.clone(), + self.config.executor.triedb_cache_size, + )?; + Arc::new(trie_db) + }; + let mut mpt = MPTTrie::new(Arc::clone(&trie_db)); - self.state_root = resp.state_root; - self.genesis.block.header.state_root = self.state_root; - self.genesis.block.header.receipts_root = resp.receipt_root; + insert_accounts(&mut mpt, &self.config.accounts).await?; - log::info!( - "Execute the genesis distribute success, genesis state root {:?}, response {:?}", - self.state_root, - resp - ); + let path_metadata = tmp_dir.path().join("metadata"); + let resp = execute_transactions( + &self.genesis, + &mut mpt, + &trie_db, + &storage, + path_metadata, + &self.config.rocksdb.clone(), + ) + .await?; + + self.apply_genesis_data(resp.state_root, resp.receipt_root)?; + + let user_provided_genesis = &self.genesis.block; + if user_provided_genesis != loaded_genesis { + let errmsg = format!( + "The user provided genesis (hash: {:#x}) is NOT \ + the same as the genesis in storage (hash: {:#x})", + user_provided_genesis.hash(), + loaded_genesis.hash() + ); + return Err(MainError::Other(errmsg).into()); + } - storage - .update_latest_proof(Context::new(), self.genesis.block.header.proof.clone()) - .await?; - storage - .insert_block(Context::new(), self.genesis.block.clone()) - .await?; - storage - .insert_transactions( - Context::new(), - self.genesis.block.header.number, - self.genesis.txs.clone(), - ) - .await?; + Ok(()) + } + fn apply_genesis_data(&mut self, state_root: H256, receipts_root: H256) -> ProtocolResult<()> { + if self.genesis.block.header.state_root.is_zero() { + self.genesis.block.header.state_root = state_root; + } else if self.genesis.block.header.state_root != state_root { + let errmsg = format!( + "The state root of genesis block which user provided is incorrect, \ + if you don't know it, you can just set it as {:#x}.", + H256::default() + ); + return Err(MainError::Other(errmsg).into()); + } + if self.genesis.block.header.receipts_root.is_zero() { + self.genesis.block.header.receipts_root = receipts_root; + } else if self.genesis.block.header.receipts_root != receipts_root { + let errmsg = format!( + "The receipts root of genesis block which user provided is incorrect, \ + if you don't know it, you can just set it as {:#x}.", + H256::default() + ); + return Err(MainError::Other(errmsg).into()); + } Ok(()) } @@ -247,7 +270,7 @@ impl Axon { observe_listen_port_occupancy(&[self.config.network.listening_address.clone()]).await?; // Init Block db and get the current block - let storage = self.init_storage(); + let storage = self.init_storage(true).await?; let current_block = storage.get_latest_block(Context::new()).await?; let current_state_root = current_block.header.state_root; @@ -257,7 +280,7 @@ impl Axon { let mut network_service = self.init_network_service(key_provider); // Init trie db - let trie_db = self.init_trie_db(); + let trie_db = self.init_trie_db(true).await?; // Init full transactions wal let txs_wal_path = self @@ -375,26 +398,25 @@ impl Axon { Ok(()) } - fn init_storage(&self) -> Arc> { + async fn init_storage( + &self, + _run_service: bool, + ) -> ProtocolResult>> { let path_block = self.config.data_path_for_block(); - - let rocks_adapter = - Arc::new(RocksAdapter::new(path_block, self.config.rocksdb.clone()).unwrap()); - + let rocks_adapter = Arc::new(RocksAdapter::new(path_block, self.config.rocksdb.clone())?); #[cfg(all( not(target_env = "msvc"), not(target_os = "macos"), feature = "jemalloc" ))] - tokio::spawn(common_memory_tracker::track_db_process( - "blockdb", - rocks_adapter.inner_db(), - )); - - Arc::new(ImplStorage::new( - rocks_adapter, - self.config.rocksdb.cache_size, - )) + if _run_service { + tokio::spawn(common_memory_tracker::track_db_process( + "blockdb", + rocks_adapter.inner_db(), + )); + } + let impl_storage = ImplStorage::new(rocks_adapter, self.config.rocksdb.cache_size); + Ok(Arc::new(impl_storage)) } fn init_network_service( @@ -412,28 +434,25 @@ impl Axon { network_service } - fn init_trie_db(&self) -> Arc { + async fn init_trie_db(&self, _run_service: bool) -> ProtocolResult> { let path_state = self.config.data_path_for_state(); - let trie_db = Arc::new( - RocksTrieDB::new( - path_state, - self.config.rocksdb.clone(), - self.config.executor.triedb_cache_size, - ) - .unwrap(), - ); - + let trie_db = Arc::new(RocksTrieDB::new( + path_state, + self.config.rocksdb.clone(), + self.config.executor.triedb_cache_size, + )?); #[cfg(all( not(target_env = "msvc"), not(target_os = "macos"), feature = "jemalloc" ))] - tokio::spawn(common_memory_tracker::track_db_process( - "triedb", - trie_db.inner_db(), - )); - - trie_db + if _run_service { + tokio::spawn(common_memory_tracker::track_db_process( + "triedb", + trie_db.inner_db(), + )); + } + Ok(trie_db) } async fn init_mempool( @@ -976,6 +995,76 @@ where } } +async fn insert_accounts( + mpt: &mut MPTTrie, + accounts: &[InitialAccount], +) -> ProtocolResult<()> { + for account in accounts { + let raw_account = Account { + nonce: 0u64.into(), + balance: account.balance, + storage_root: RLP_NULL, + code_hash: NIL_DATA, + } + .encode()?; + mpt.insert(account.address.as_bytes(), &raw_account)?; + } + Ok(()) +} + +async fn execute_transactions( + rich: &RichBlock, + mpt: &mut MPTTrie, + trie_db: &Arc, + storage: &Arc, + path_metadata: PathBuf, + rocksdb: &ConfigRocksDB, +) -> ProtocolResult +where + S: Storage + 'static, + DB: TrieDB + 'static, +{ + let executor = AxonExecutor::default(); + let mut backend = AxonExecutorAdapter::from_root( + mpt.commit()?, + Arc::clone(trie_db), + Arc::clone(storage), + Proposal::new_without_state_root(&rich.block.header).into(), + )?; + + system_contract::init(path_metadata, rocksdb.clone(), &mut backend); + + let resp = executor.exec(&mut backend, &rich.txs, &[]); + + resp.tx_resp.iter().enumerate().for_each(|(i, r)| { + if !r.exit_reason.is_succeed() { + panic!( + "The {}th tx in genesis execute failed, reason {:?}", + i, r.exit_reason + ); + } + }); + + Ok(resp) +} + +async fn save_block(storage: &Arc, rich: &RichBlock) -> ProtocolResult<()> +where + S: Storage + 'static, +{ + storage + .update_latest_proof(Context::new(), rich.block.header.proof.clone()) + .await?; + storage + .insert_block(Context::new(), rich.block.clone()) + .await?; + storage + .insert_transactions(Context::new(), rich.block.header.number, rich.txs.clone()) + .await?; + + Ok(()) +} + #[cfg(test)] mod tests { use protocol::codec::hex_decode;