From 45e18dccf137cc522d3291feae8dfbdb73c2e1ef Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 19 Sep 2023 10:27:00 +0300 Subject: [PATCH] NDEV-2183: Implement get_sync_status for eth_syncing (#193) * NDEV-2183: Implement get_sync_status for eth_syncing --- evm_loader/lib/src/errors.rs | 2 +- evm_loader/lib/src/types/mod.rs | 3 +- evm_loader/lib/src/types/tracer_ch_common.rs | 131 +++++++++++++ evm_loader/lib/src/types/tracer_ch_db.rs | 183 ++++++++----------- 4 files changed, 209 insertions(+), 110 deletions(-) create mode 100644 evm_loader/lib/src/types/tracer_ch_common.rs diff --git a/evm_loader/lib/src/errors.rs b/evm_loader/lib/src/errors.rs index abe19885c..132e6e5b2 100644 --- a/evm_loader/lib/src/errors.rs +++ b/evm_loader/lib/src/errors.rs @@ -13,7 +13,7 @@ use solana_sdk::signer::SignerError as SolanaSignerError; use thiserror::Error; use crate::commands::init_environment::EnvironmentError; -use crate::types::ChError; +use crate::types::tracer_ch_common::ChError; /// Errors that may be returned by the neon-cli program. #[derive(Debug, Error)] diff --git a/evm_loader/lib/src/types/mod.rs b/evm_loader/lib/src/types/mod.rs index a3f014d7b..69e8099ff 100644 --- a/evm_loader/lib/src/types/mod.rs +++ b/evm_loader/lib/src/types/mod.rs @@ -1,4 +1,5 @@ pub mod request_models; +pub mod tracer_ch_common; mod tracer_ch_db; pub use evm_loader::types::Address; @@ -7,7 +8,7 @@ use solana_sdk::pubkey::Pubkey; use std::str::FromStr; use tokio::runtime::Runtime; use tokio::task::block_in_place; -pub use tracer_ch_db::{ChError, ChResult, ClickHouseDb as TracerDb}; +pub use tracer_ch_db::ClickHouseDb as TracerDb; use evm_loader::evm::tracing::TraceCallConfig; use evm_loader::types::hexbytes::HexBytes; diff --git a/evm_loader/lib/src/types/tracer_ch_common.rs b/evm_loader/lib/src/types/tracer_ch_common.rs new file mode 100644 index 000000000..c7c0e54f4 --- /dev/null +++ b/evm_loader/lib/src/types/tracer_ch_common.rs @@ -0,0 +1,131 @@ +use std::fmt; + +use clickhouse::Row; +use serde::{Deserialize, Serialize}; +use solana_sdk::{account::Account, pubkey::Pubkey}; +use thiserror::Error; + +pub const ROOT_BLOCK_DELAY: u8 = 100; + +#[derive(Error, Debug)] +pub enum ChError { + #[error("clickhouse: {}", .0)] + Db(#[from] clickhouse::error::Error), +} + +pub type ChResult = std::result::Result; + +pub enum SlotStatus { + #[allow(unused)] + Confirmed = 1, + #[allow(unused)] + Processed = 2, + Rooted = 3, +} + +#[derive(Debug, Row, serde::Deserialize, Clone)] +pub struct SlotParent { + pub slot: u64, + pub parent: Option, + pub status: u8, +} + +#[derive(Debug, Row, serde::Deserialize, Clone)] +pub struct SlotParentRooted { + pub slot: u64, + pub parent: Option, +} + +impl From for SlotParent { + fn from(slot_parent_rooted: SlotParentRooted) -> Self { + SlotParent { + slot: slot_parent_rooted.slot, + parent: slot_parent_rooted.parent, + status: SlotStatus::Rooted as u8, + } + } +} + +impl SlotParent { + pub fn is_rooted(&self) -> bool { + self.status == SlotStatus::Rooted as u8 + } +} + +#[derive(Row, serde::Deserialize, Clone)] +pub struct AccountRow { + pub owner: Vec, + pub lamports: u64, + pub executable: bool, + pub rent_epoch: u64, + pub data: Vec, + pub txn_signature: Vec>, +} + +impl fmt::Display for AccountRow { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "AccountRow {{\n owner: {},\n lamports: {},\n executable: {},\n rent_epoch: {},\n}}", + bs58::encode(&self.owner).into_string(), + self.lamports, + self.executable, + self.rent_epoch, + ) + } +} + +impl fmt::Debug for AccountRow { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Account") + .field("owner", &bs58::encode(&self.owner).into_string()) + .field("lamports", &self.lamports) + .field("executable", &self.executable) + .field("rent_epoch", &self.rent_epoch) + .finish() + } +} + +impl TryInto for AccountRow { + type Error = String; + + fn try_into(self) -> Result { + let owner = Pubkey::try_from(self.owner).map_err(|src| { + format!( + "Incorrect slice length ({}) while converting owner from: {src:?}", + src.len(), + ) + })?; + + Ok(Account { + lamports: self.lamports, + data: self.data, + owner, + rent_epoch: self.rent_epoch, + executable: self.executable, + }) + } +} + +pub enum EthSyncStatus { + Syncing(EthSyncing), + Synced, +} + +impl EthSyncStatus { + pub fn new(syncing_status: Option) -> Self { + if let Some(syncing_status) = syncing_status { + Self::Syncing(syncing_status) + } else { + Self::Synced + } + } +} + +#[derive(Row, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct EthSyncing { + pub starting_block: u64, + pub current_block: u64, + pub highest_block: u64, +} diff --git a/evm_loader/lib/src/types/tracer_ch_db.rs b/evm_loader/lib/src/types/tracer_ch_db.rs index 2b653c4f9..2cf0a1b9d 100644 --- a/evm_loader/lib/src/types/tracer_ch_db.rs +++ b/evm_loader/lib/src/types/tracer_ch_db.rs @@ -1,7 +1,14 @@ -use crate::commands::get_neon_elf::get_elf_parameter; +use crate::{ + commands::get_neon_elf::get_elf_parameter, + types::tracer_ch_common::{AccountRow, ChError, SlotParent, ROOT_BLOCK_DELAY}, +}; + +use super::{ + tracer_ch_common::{ChResult, EthSyncStatus, EthSyncing, SlotParentRooted}, + ChDbConfig, +}; -use super::ChDbConfig; -use clickhouse::{Client, Row}; +use clickhouse::Client; use log::{debug, info}; use rand::Rng; use solana_sdk::{ @@ -14,22 +21,9 @@ use std::{ Ord, Ordering::{Equal, Greater, Less}, }, - convert::TryFrom, - fmt, sync::Arc, time::Instant, }; -use thiserror::Error; - -const ROOT_BLOCK_DELAY: u8 = 100; - -#[derive(Error, Debug)] -pub enum ChError { - #[error("clickhouse: {}", .0)] - Db(#[from] clickhouse::error::Error), -} - -pub type ChResult = std::result::Result; #[allow(dead_code)] #[derive(Clone)] @@ -37,98 +31,6 @@ pub struct ClickHouseDb { pub client: Arc, } -pub enum SlotStatus { - #[allow(unused)] - Confirmed = 1, - #[allow(unused)] - Processed = 2, - Rooted = 3, -} - -#[derive(Debug, Row, serde::Deserialize, Clone)] -pub struct SlotParent { - pub slot: u64, - pub parent: Option, - pub status: u8, -} - -#[derive(Debug, Row, serde::Deserialize, Clone)] -pub struct SlotParentRooted { - pub slot: u64, - pub parent: Option, -} - -impl From for SlotParent { - fn from(slot_parent_rooted: SlotParentRooted) -> Self { - SlotParent { - slot: slot_parent_rooted.slot, - parent: slot_parent_rooted.parent, - status: SlotStatus::Rooted as u8, - } - } -} - -impl SlotParent { - fn is_rooted(&self) -> bool { - self.status == SlotStatus::Rooted as u8 - } -} - -#[derive(Row, serde::Deserialize, Clone)] -pub struct AccountRow { - pub owner: Vec, - pub lamports: u64, - pub executable: bool, - pub rent_epoch: u64, - pub data: Vec, - pub txn_signature: Vec>, -} - -impl fmt::Display for AccountRow { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "AccountRow {{\n owner: {},\n lamports: {},\n executable: {},\n rent_epoch: {},\n}}", - bs58::encode(&self.owner).into_string(), - self.lamports, - self.executable, - self.rent_epoch, - ) - } -} - -impl fmt::Debug for AccountRow { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Account") - .field("owner", &bs58::encode(&self.owner).into_string()) - .field("lamports", &self.lamports) - .field("executable", &self.executable) - .field("rent_epoch", &self.rent_epoch) - .finish() - } -} - -impl TryInto for AccountRow { - type Error = String; - - fn try_into(self) -> Result { - let owner = Pubkey::try_from(self.owner).map_err(|src| { - format!( - "Incorrect slice length ({}) while converting owner from: {src:?}", - src.len(), - ) - })?; - - Ok(Account { - lamports: self.lamports, - data: self.data, - owner, - rent_epoch: self.rent_epoch, - executable: self.executable, - }) - } -} - impl ClickHouseDb { pub fn new(config: &ChDbConfig) -> Self { let url_id = rand::thread_rng().gen_range(0..config.clickhouse_url.len()); @@ -620,6 +522,71 @@ impl ClickHouseDb { } } + pub async fn get_slot_by_blockhash(&self, blockhash: &str) -> ChResult { + let query = r#"SELECT slot + FROM events.notify_block_distributed + WHERE hash = ? + LIMIT 1 + "#; + + let slot = Self::row_opt( + self.client + .query(query) + .bind(blockhash) + .fetch_one::() + .await, + )?; + + match slot { + Some(slot) => Ok(slot), + None => Err(ChError::Db(clickhouse::error::Error::Custom( + "get_slot_by_blockhash: no data available".to_string(), + ))), + } + } + + pub async fn get_sync_status(&self) -> ChResult { + let query_is_startup = r#"SELECT is_startup + FROM events.update_account_distributed + WHERE slot = ( + SELECT MAX(slot) + FROM events.update_account_distributed + ) + LIMIT 1; + "#; + + let is_startup = Self::row_opt( + self.client + .query(query_is_startup) + .fetch_one::() + .await, + )?; + + if let Some(true) = is_startup { + let query = r#"SELECT slot + FROM ( + (SELECT MIN(slot) as slot FROM events.notify_block_distributed) + UNION ALL + (SELECT MAX(slot) as slot FROM events.notify_block_distributed) + UNION ALL + (SELECT MAX(slot) as slot FROM events.notify_block_distributed) + ) + ORDER BY slot ASC + "#; + + let data = Self::row_opt(self.client.query(query).fetch_one::().await)?; + + return match data { + Some(data) => Ok(EthSyncStatus::new(Some(data))), + None => Err(ChError::Db(clickhouse::error::Error::Custom( + "get_sync_status: no data available".to_string(), + ))), + }; + } + + Ok(EthSyncStatus::new(None)) + } + fn row_opt(result: clickhouse::error::Result) -> clickhouse::error::Result> { match result { Ok(row) => Ok(Some(row)),