Skip to content

Commit

Permalink
NDEV-2183: Implement get_sync_status for eth_syncing (#193)
Browse files Browse the repository at this point in the history
* NDEV-2183: Implement get_sync_status for eth_syncing
  • Loading branch information
Deniskore authored Sep 19, 2023
1 parent be8eb1e commit e79ec26
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 110 deletions.
2 changes: 1 addition & 1 deletion evm_loader/lib/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use solana_sdk::signer::SignerError as SolanaSignerError;
use thiserror::Error;

use crate::commands::init_environment::EnvironmentError;
use crate::types::ChError;
use crate::types::tracer_ch_common::ChError;

/// Errors that may be returned by the neon-cli program.
#[derive(Debug, Error)]
Expand Down
3 changes: 2 additions & 1 deletion evm_loader/lib/src/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod request_models;
pub mod tracer_ch_common;
mod tracer_ch_db;

pub use evm_loader::types::Address;
Expand All @@ -7,7 +8,7 @@ use solana_sdk::pubkey::Pubkey;
use std::str::FromStr;
use tokio::runtime::Runtime;
use tokio::task::block_in_place;
pub use tracer_ch_db::{ChError, ChResult, ClickHouseDb as TracerDb};
pub use tracer_ch_db::ClickHouseDb as TracerDb;

use evm_loader::evm::tracing::TraceCallConfig;
use evm_loader::types::hexbytes::HexBytes;
Expand Down
131 changes: 131 additions & 0 deletions evm_loader/lib/src/types/tracer_ch_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::fmt;

use clickhouse::Row;
use serde::{Deserialize, Serialize};
use solana_sdk::{account::Account, pubkey::Pubkey};
use thiserror::Error;

pub const ROOT_BLOCK_DELAY: u8 = 100;

#[derive(Error, Debug)]
pub enum ChError {
#[error("clickhouse: {}", .0)]
Db(#[from] clickhouse::error::Error),
}

pub type ChResult<T> = std::result::Result<T, ChError>;

pub enum SlotStatus {
#[allow(unused)]
Confirmed = 1,
#[allow(unused)]
Processed = 2,
Rooted = 3,
}

#[derive(Debug, Row, serde::Deserialize, Clone)]
pub struct SlotParent {
pub slot: u64,
pub parent: Option<u64>,
pub status: u8,
}

#[derive(Debug, Row, serde::Deserialize, Clone)]
pub struct SlotParentRooted {
pub slot: u64,
pub parent: Option<u64>,
}

impl From<SlotParentRooted> for SlotParent {
fn from(slot_parent_rooted: SlotParentRooted) -> Self {
SlotParent {
slot: slot_parent_rooted.slot,
parent: slot_parent_rooted.parent,
status: SlotStatus::Rooted as u8,
}
}
}

impl SlotParent {
pub fn is_rooted(&self) -> bool {
self.status == SlotStatus::Rooted as u8
}
}

#[derive(Row, serde::Deserialize, Clone)]
pub struct AccountRow {
pub owner: Vec<u8>,
pub lamports: u64,
pub executable: bool,
pub rent_epoch: u64,
pub data: Vec<u8>,
pub txn_signature: Vec<Option<u8>>,
}

impl fmt::Display for AccountRow {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"AccountRow {{\n owner: {},\n lamports: {},\n executable: {},\n rent_epoch: {},\n}}",
bs58::encode(&self.owner).into_string(),
self.lamports,
self.executable,
self.rent_epoch,
)
}
}

impl fmt::Debug for AccountRow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Account")
.field("owner", &bs58::encode(&self.owner).into_string())
.field("lamports", &self.lamports)
.field("executable", &self.executable)
.field("rent_epoch", &self.rent_epoch)
.finish()
}
}

impl TryInto<Account> for AccountRow {
type Error = String;

fn try_into(self) -> Result<Account, Self::Error> {
let owner = Pubkey::try_from(self.owner).map_err(|src| {
format!(
"Incorrect slice length ({}) while converting owner from: {src:?}",
src.len(),
)
})?;

Ok(Account {
lamports: self.lamports,
data: self.data,
owner,
rent_epoch: self.rent_epoch,
executable: self.executable,
})
}
}

pub enum EthSyncStatus {
Syncing(EthSyncing),
Synced,
}

impl EthSyncStatus {
pub fn new(syncing_status: Option<EthSyncing>) -> Self {
if let Some(syncing_status) = syncing_status {
Self::Syncing(syncing_status)
} else {
Self::Synced
}
}
}

#[derive(Row, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct EthSyncing {
pub starting_block: u64,
pub current_block: u64,
pub highest_block: u64,
}
183 changes: 75 additions & 108 deletions evm_loader/lib/src/types/tracer_ch_db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use crate::commands::get_neon_elf::get_elf_parameter;
use crate::{
commands::get_neon_elf::get_elf_parameter,
types::tracer_ch_common::{AccountRow, ChError, SlotParent, ROOT_BLOCK_DELAY},
};

use super::{
tracer_ch_common::{ChResult, EthSyncStatus, EthSyncing, SlotParentRooted},
ChDbConfig,
};

use super::ChDbConfig;
use clickhouse::{Client, Row};
use clickhouse::Client;
use log::{debug, info};
use rand::Rng;
use solana_sdk::{
Expand All @@ -14,121 +21,16 @@ use std::{
Ord,
Ordering::{Equal, Greater, Less},
},
convert::TryFrom,
fmt,
sync::Arc,
time::Instant,
};
use thiserror::Error;

const ROOT_BLOCK_DELAY: u8 = 100;

#[derive(Error, Debug)]
pub enum ChError {
#[error("clickhouse: {}", .0)]
Db(#[from] clickhouse::error::Error),
}

pub type ChResult<T> = std::result::Result<T, ChError>;

#[allow(dead_code)]
#[derive(Clone)]
pub struct ClickHouseDb {
pub client: Arc<Client>,
}

pub enum SlotStatus {
#[allow(unused)]
Confirmed = 1,
#[allow(unused)]
Processed = 2,
Rooted = 3,
}

#[derive(Debug, Row, serde::Deserialize, Clone)]
pub struct SlotParent {
pub slot: u64,
pub parent: Option<u64>,
pub status: u8,
}

#[derive(Debug, Row, serde::Deserialize, Clone)]
pub struct SlotParentRooted {
pub slot: u64,
pub parent: Option<u64>,
}

impl From<SlotParentRooted> for SlotParent {
fn from(slot_parent_rooted: SlotParentRooted) -> Self {
SlotParent {
slot: slot_parent_rooted.slot,
parent: slot_parent_rooted.parent,
status: SlotStatus::Rooted as u8,
}
}
}

impl SlotParent {
fn is_rooted(&self) -> bool {
self.status == SlotStatus::Rooted as u8
}
}

#[derive(Row, serde::Deserialize, Clone)]
pub struct AccountRow {
pub owner: Vec<u8>,
pub lamports: u64,
pub executable: bool,
pub rent_epoch: u64,
pub data: Vec<u8>,
pub txn_signature: Vec<Option<u8>>,
}

impl fmt::Display for AccountRow {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"AccountRow {{\n owner: {},\n lamports: {},\n executable: {},\n rent_epoch: {},\n}}",
bs58::encode(&self.owner).into_string(),
self.lamports,
self.executable,
self.rent_epoch,
)
}
}

impl fmt::Debug for AccountRow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Account")
.field("owner", &bs58::encode(&self.owner).into_string())
.field("lamports", &self.lamports)
.field("executable", &self.executable)
.field("rent_epoch", &self.rent_epoch)
.finish()
}
}

impl TryInto<Account> for AccountRow {
type Error = String;

fn try_into(self) -> Result<Account, Self::Error> {
let owner = Pubkey::try_from(self.owner).map_err(|src| {
format!(
"Incorrect slice length ({}) while converting owner from: {src:?}",
src.len(),
)
})?;

Ok(Account {
lamports: self.lamports,
data: self.data,
owner,
rent_epoch: self.rent_epoch,
executable: self.executable,
})
}
}

impl ClickHouseDb {
pub fn new(config: &ChDbConfig) -> Self {
let url_id = rand::thread_rng().gen_range(0..config.clickhouse_url.len());
Expand Down Expand Up @@ -620,6 +522,71 @@ impl ClickHouseDb {
}
}

pub async fn get_slot_by_blockhash(&self, blockhash: &str) -> ChResult<u64> {
let query = r#"SELECT slot
FROM events.notify_block_distributed
WHERE hash = ?
LIMIT 1
"#;

let slot = Self::row_opt(
self.client
.query(query)
.bind(blockhash)
.fetch_one::<u64>()
.await,
)?;

match slot {
Some(slot) => Ok(slot),
None => Err(ChError::Db(clickhouse::error::Error::Custom(
"get_slot_by_blockhash: no data available".to_string(),
))),
}
}

pub async fn get_sync_status(&self) -> ChResult<EthSyncStatus> {
let query_is_startup = r#"SELECT is_startup
FROM events.update_account_distributed
WHERE slot = (
SELECT MAX(slot)
FROM events.update_account_distributed
)
LIMIT 1;
"#;

let is_startup = Self::row_opt(
self.client
.query(query_is_startup)
.fetch_one::<bool>()
.await,
)?;

if let Some(true) = is_startup {
let query = r#"SELECT slot
FROM (
(SELECT MIN(slot) as slot FROM events.notify_block_distributed)
UNION ALL
(SELECT MAX(slot) as slot FROM events.notify_block_distributed)
UNION ALL
(SELECT MAX(slot) as slot FROM events.notify_block_distributed)
)
ORDER BY slot ASC
"#;

let data = Self::row_opt(self.client.query(query).fetch_one::<EthSyncing>().await)?;

return match data {
Some(data) => Ok(EthSyncStatus::new(Some(data))),
None => Err(ChError::Db(clickhouse::error::Error::Custom(
"get_sync_status: no data available".to_string(),
))),
};
}

Ok(EthSyncStatus::new(None))
}

fn row_opt<T>(result: clickhouse::error::Result<T>) -> clickhouse::error::Result<Option<T>> {
match result {
Ok(row) => Ok(Some(row)),
Expand Down

0 comments on commit e79ec26

Please sign in to comment.