diff --git a/src/mmr/runner.rs b/src/mmr/runner.rs index 7703dca5..e4b46854 100644 --- a/src/mmr/runner.rs +++ b/src/mmr/runner.rs @@ -13,6 +13,15 @@ use cmmr::MMR; use reqwest::Client; use rocksdb::{IteratorMode, DB}; use std::{env, sync::Arc, time}; +use std::time::{SystemTime, UNIX_EPOCH}; + +fn now() -> u128 { + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + since_the_epoch.as_millis() +} /// MMR Runner #[derive(Clone)] @@ -35,34 +44,40 @@ impl From for Runner { impl Runner { /// Start the runner pub async fn start(&mut self) -> Result<(), Error> { - let mut ptr = { - let last_leaf = - helper::mmr_size_to_last_leaf(self.db.iterator(IteratorMode::Start).count() as i64); + let mut mmr_size = self.db.iterator(IteratorMode::Start).count() as u64; + let last_leaf = helper::mmr_size_to_last_leaf(mmr_size as i64); + let mut ptr = if last_leaf == 0 { 0 } else { last_leaf + 1 - } - }; + }; loop { - if let Err(e) = self.push(ptr).await { - trace!("Push block to mmr_store failed: {:?}", e); - trace!("MMR service restarting after 10s..."); - actix_rt::time::delay_for(time::Duration::from_secs(10)).await; - } else { - if ptr - % env::var("MMR_LOG") + debug!("-{}-{}------------", ptr, mmr_size); + let a = now(); + match self.push(ptr, mmr_size).await { + Err(e) => { + trace!("Push block to mmr_store failed: {:?}", e); + trace!("MMR service restarting after 10s..."); + actix_rt::time::delay_for(time::Duration::from_secs(10)).await; + }, + Ok(mmr_size_new) => { + if ptr + % env::var("MMR_LOG") .unwrap_or_else(|_| "10000".to_string()) .parse::() .unwrap_or(10000) - == 0 - { - trace!("Pushed mmr {} into database", ptr); - } + == 0 + { + trace!("Pushed mmr {} into database", ptr); + } - ptr += 1; + mmr_size = mmr_size_new; + ptr += 1; + } } + debug!("total: {}", now() - a); } } @@ -75,29 +90,44 @@ impl Runner { } /// Push new header hash into storage - pub async fn push(&mut self, number: i64) -> Result<(), Error> { - let mmr_size = if number == 0 { - 0 - } else { - cmmr::leaf_index_to_mmr_size((number - 1) as u64) - } as u64; + pub async fn push(&mut self, number: i64, mmr_size: u64) -> Result { + let a = now(); let mut mmr = MMR::<_, MergeHash, _>::new(mmr_size, &self.store); - mmr.push(H256::from( - &EthHeaderRPCResp::get(&self.client, number as u64) - .await? - .result - .hash, - ))?; + + let b = now(); + debug!("mmr create : {}", b - a); + + let hash_from_ethereum = &EthHeaderRPCResp::get(&self.client, number as u64) + .await? + .result + .hash; + + let c = now(); + debug!("rpc call : {}", c - b); + + mmr.push(H256::from(hash_from_ethereum))?; + + let d = now(); + debug!("push to mmr : {}", d - c); + + let mmr_size_new = mmr.mmr_size(); + + let e = now(); + debug!("get new size: {}", e - d); mmr.commit()?; - Ok(()) + + let f = now(); + debug!("commit : {}", f - e); + + Ok(mmr_size_new) } /// Gen mmrs for tests pub async fn stops_at(&mut self, count: i64) -> Result<(), Error> { - let mmr_size = self.db.iterator(IteratorMode::Start).count() as i64; + let mut mmr_size = self.db.iterator(IteratorMode::Start).count() as u64; let mut ptr = { - let last_leaf = helper::mmr_size_to_last_leaf(mmr_size); + let last_leaf = helper::mmr_size_to_last_leaf(mmr_size as i64); if last_leaf == 0 { 0 } else { @@ -109,8 +139,10 @@ impl Runner { if ptr >= count { break; } - self.push(ptr).await?; - ptr += 1; + if let Ok(mmr_size_new) = self.push(ptr, mmr_size).await { + mmr_size = mmr_size_new; + ptr += 1; + } } Ok(())