Skip to content
This repository has been archived by the owner on Oct 9, 2022. It is now read-only.

Optimization for mmr generating #73

Merged
merged 3 commits into from
Sep 5, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 66 additions & 34 deletions src/mmr/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ use cmmr::MMR;
use reqwest::Client;
use rocksdb::{IteratorMode, DB};
use std::{env, sync::Arc, time};
use std::time::{SystemTime, UNIX_EPOCH};

fn now() -> u128 {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
since_the_epoch.as_millis()
}

/// MMR Runner
#[derive(Clone)]
Expand All @@ -35,34 +44,40 @@ impl From<ShadowShared> for Runner {
impl Runner {
/// Start the runner
pub async fn start(&mut self) -> Result<(), Error> {
let mut ptr = {
let last_leaf =
helper::mmr_size_to_last_leaf(self.db.iterator(IteratorMode::Start).count() as i64);
let mut mmr_size = self.db.iterator(IteratorMode::Start).count() as u64;
let last_leaf = helper::mmr_size_to_last_leaf(mmr_size as i64);
let mut ptr =
if last_leaf == 0 {
0
} else {
last_leaf + 1
}
};
};

loop {
if let Err(e) = self.push(ptr).await {
trace!("Push block to mmr_store failed: {:?}", e);
trace!("MMR service restarting after 10s...");
actix_rt::time::delay_for(time::Duration::from_secs(10)).await;
} else {
if ptr
% env::var("MMR_LOG")
debug!("-{}-{}------------", ptr, mmr_size);
let a = now();
match self.push(ptr, mmr_size).await {
Err(e) => {
trace!("Push block to mmr_store failed: {:?}", e);
trace!("MMR service restarting after 10s...");
actix_rt::time::delay_for(time::Duration::from_secs(10)).await;
},
Ok(mmr_size_new) => {
if ptr
% env::var("MMR_LOG")
.unwrap_or_else(|_| "10000".to_string())
.parse::<i64>()
.unwrap_or(10000)
== 0
{
trace!("Pushed mmr {} into database", ptr);
}
== 0
{
trace!("Pushed mmr {} into database", ptr);
}

ptr += 1;
mmr_size = mmr_size_new;
ptr += 1;
}
}
debug!("total: {}", now() - a);
}
}

Expand All @@ -75,29 +90,44 @@ impl Runner {
}

/// Push new header hash into storage
pub async fn push(&mut self, number: i64) -> Result<(), Error> {
let mmr_size = if number == 0 {
0
} else {
cmmr::leaf_index_to_mmr_size((number - 1) as u64)
} as u64;
pub async fn push(&mut self, number: i64, mmr_size: u64) -> Result<u64, Error> {
let a = now();
let mut mmr = MMR::<_, MergeHash, _>::new(mmr_size, &self.store);
mmr.push(H256::from(
&EthHeaderRPCResp::get(&self.client, number as u64)
.await?
.result
.hash,
))?;

let b = now();
debug!("mmr create : {}", b - a);

let hash_from_ethereum = &EthHeaderRPCResp::get(&self.client, number as u64)
.await?
.result
.hash;

let c = now();
debug!("rpc call : {}", c - b);

mmr.push(H256::from(hash_from_ethereum))?;

let d = now();
debug!("push to mmr : {}", d - c);

let mmr_size_new = mmr.mmr_size();

let e = now();
debug!("get new size: {}", e - d);

mmr.commit()?;
Ok(())

let f = now();
debug!("commit : {}", f - e);

Ok(mmr_size_new)
}

/// Gen mmrs for tests
pub async fn stops_at(&mut self, count: i64) -> Result<(), Error> {
let mmr_size = self.db.iterator(IteratorMode::Start).count() as i64;
let mut mmr_size = self.db.iterator(IteratorMode::Start).count() as u64;
let mut ptr = {
let last_leaf = helper::mmr_size_to_last_leaf(mmr_size);
let last_leaf = helper::mmr_size_to_last_leaf(mmr_size as i64);
if last_leaf == 0 {
0
} else {
Expand All @@ -109,8 +139,10 @@ impl Runner {
if ptr >= count {
break;
}
self.push(ptr).await?;
ptr += 1;
if let Ok(mmr_size_new) = self.push(ptr, mmr_size).await {
mmr_size = mmr_size_new;
ptr += 1;
}
}

Ok(())
Expand Down