Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe): Deep sync, faster chain load #4063

Merged
merged 25 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl Service {

// Set block as valid - means state db has all states for the end of the block
db.set_block_end_state_is_valid(block_hash, true);
db.set_latest_valid_block(block_hash);

Ok(transition_outcomes)
}
Expand Down
3 changes: 3 additions & 0 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub trait BlockMetaStorage {

fn block_outcome(&self, block_hash: H256) -> Option<Vec<StateTransition>>;
fn set_block_outcome(&self, block_hash: H256, outcome: Vec<StateTransition>);

fn latest_valid_block(&self) -> Option<H256>;
fn set_latest_valid_block(&self, block_hash: H256);
}

pub trait CodesStorage {
Expand Down
23 changes: 23 additions & 0 deletions ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ use parity_scale_codec::{Decode, Encode};

const LOG_TARGET: &str = "hyper-db";

enum Key {
LatestValidBlock,
}

impl Key {
fn as_bytes(&self) -> &'static [u8] {
match self {
Key::LatestValidBlock => b"latest_valid_block",
}
}
}

#[repr(u64)]
enum KeyPrefix {
ProgramToCodeId = 0,
ukint-vs marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -233,6 +245,17 @@ impl BlockMetaStorage for Database {
self.kv
.put(&KeyPrefix::BlockOutcome.one(block_hash), outcome.encode());
}

fn latest_valid_block(&self) -> Option<H256> {
self.kv
.get(Key::LatestValidBlock.as_bytes())
.map(|block_hash| H256::from_slice(&block_hash))
}

fn set_latest_valid_block(&self, block_hash: H256) {
self.kv
.put(Key::LatestValidBlock.as_bytes(), block_hash.0.to_vec());
}
}

impl CodesStorage for Database {
Expand Down
110 changes: 92 additions & 18 deletions ethexe/observer/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ use crate::{
BlobReader,
};
use alloy::{
primitives::Address as AlloyAddress,
eips::BlockNumberOrTag,
primitives::{Address as AlloyAddress, B256},
providers::{Provider, ProviderBuilder},
rpc::types::eth::BlockTransactionsKind,
rpc::types::{eth::BlockTransactionsKind, Filter},
};
use anyhow::{anyhow, Result};
use ethexe_common::{
db::{BlockHeader, BlockMetaStorage},
events::BlockEvent,
events::{BlockCommitted, BlockEvent},
};
use ethexe_ethereum::event::{match_log, signature_hash};
use ethexe_signer::Address;
use gprimitives::{ActorId, CodeId, H256};

Expand Down Expand Up @@ -70,35 +72,113 @@ impl Query {
.collect())
}

async fn get_all_committed_blocks(
ukint-vs marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
from_block: u32,
to_block: u32,
) -> Result<Vec<H256>> {
let router_events_filter = Filter::new()
.from_block(from_block as u64)
.to_block(to_block as u64)
.address(self.router_address)
.event_signature(B256::new(signature_hash::BLOCK_COMMITTED));

let logs = self.provider.get_logs(&router_events_filter).await?;

let mut commited_blocks = vec![];
for log in logs.iter() {
let Some(BlockEvent::BlockCommitted(BlockCommitted { block_hash })) = match_log(log)?
else {
continue;
};
commited_blocks.push(block_hash);
}

log::trace!("Read committed blocks from {from_block} to {to_block}");

Ok(commited_blocks)
}

async fn get_latest_valid_block(&mut self) -> Result<(H256, BlockHeader)> {
if let Some(block_hash) = self.database.latest_valid_block() {
let header = self
.database
.block_header(block_hash)
.ok_or(anyhow!("{block_hash} not found in db. Corrupted"))?;

let Some(block) = self
.provider
.get_block_by_number((header.height as u64).into(), false)
.await?
else {
return Ok((block_hash, header));
};
ukint-vs marked this conversation as resolved.
Show resolved Hide resolved
if block.header.hash.map(|h| h.0) == Some(block_hash.0) {
Ok((block_hash, header))
} else {
let finalized_block = self
.provider
.get_block_by_number(BlockNumberOrTag::Finalized, false)
.await?
.ok_or(anyhow!("Failed to get finalized block"))?;
if finalized_block.header.number.unwrap() >= header.height as u64 {
log::warn!("Latest valid block doesn't match on-chain block.");
let hash = H256(block.header.hash.unwrap().0);
Ok((hash, self.get_block_header_meta(hash).await?))
} else {
Ok((block_hash, header))
}
}
} else {
log::warn!("Latest valid block not found, sync to genesis.");
Ok((
self.genesis_block_hash,
self.get_block_header_meta(self.genesis_block_hash).await?,
))
ukint-vs marked this conversation as resolved.
Show resolved Hide resolved
}
}

pub async fn get_last_committed_chain(&mut self, block_hash: H256) -> Result<Vec<H256>> {
let mut chain = Vec::new();
let mut committed_blocks = BTreeSet::new();

// First we need to find the nearest valid block
let current_block = self.get_block_header_meta(block_hash).await?;

// Find latest_valid_block or use genesis.
let (latest_valid_block_hash, latest_valid_block) = self.get_latest_valid_block().await?;

log::trace!("Nearest valid in db block: {latest_valid_block:?}");

if current_block.height - latest_valid_block.height >= self.max_commitment_depth {
return Err(anyhow!("too deep chain"));
}

let committed_blocks = self
.get_all_committed_blocks(latest_valid_block.height, current_block.height)
.await?;

// Populate db to the latest valid block.
let mut hash = block_hash;
loop {
// TODO: limit deepness

if hash == self.genesis_block_hash {
// Genesis block is always valid
log::trace!("Genesis block reached: {hash}");
if hash == latest_valid_block_hash {
log::trace!("Chain loaded to the nearest valid block: {hash}");
break;
}

// Reset latest_valid_block if found.
// TODO: Remove once all db pruned or updated.
if self
.database
.block_end_state_is_valid(hash)
.unwrap_or(false)
{
self.database.set_latest_valid_block(hash);
log::trace!("Nearest valid in db block found: {hash}");
break;
}

log::trace!("Include block {hash} in chain for processing");
chain.push(hash);

committed_blocks.extend(self.get_committed_blocks(hash).await?);

hash = self.get_block_parent_hash(hash).await?;
}

Expand All @@ -117,12 +197,7 @@ impl Query {
};

// Now we need append in chain all blocks from the oldest not committed to the current.
let mut depth = 0;
loop {
if depth >= self.max_commitment_depth {
return Err(anyhow!("too deep chain"));
}

log::trace!("Include block {hash} in chain for processing");
chain.push(hash);

Expand All @@ -132,7 +207,6 @@ impl Query {
}

hash = self.get_block_parent_hash(hash).await?;
depth += 1;
}

Ok(chain)
Expand Down
Loading