From 45fd227903876e99384c955684858e3db375e9de Mon Sep 17 00:00:00 2001 From: Denis Avvakumov Date: Tue, 12 Sep 2023 19:28:35 +0300 Subject: [PATCH 1/3] NDEV-2183: Implement get_sync_status for eth_syncing --- evm_loader/lib/src/types/mod.rs | 3 +- evm_loader/lib/src/types/tracer_ch_common.rs | 116 +++++++++++++++ evm_loader/lib/src/types/tracer_ch_db.rs | 143 +++++-------------- 3 files changed, 153 insertions(+), 109 deletions(-) create mode 100644 evm_loader/lib/src/types/tracer_ch_common.rs diff --git a/evm_loader/lib/src/types/mod.rs b/evm_loader/lib/src/types/mod.rs index a3f014d7b..69e8099ff 100644 --- a/evm_loader/lib/src/types/mod.rs +++ b/evm_loader/lib/src/types/mod.rs @@ -1,4 +1,5 @@ pub mod request_models; +pub mod tracer_ch_common; mod tracer_ch_db; pub use evm_loader::types::Address; @@ -7,7 +8,7 @@ use solana_sdk::pubkey::Pubkey; use std::str::FromStr; use tokio::runtime::Runtime; use tokio::task::block_in_place; -pub use tracer_ch_db::{ChError, ChResult, ClickHouseDb as TracerDb}; +pub use tracer_ch_db::ClickHouseDb as TracerDb; use evm_loader::evm::tracing::TraceCallConfig; use evm_loader::types::hexbytes::HexBytes; diff --git a/evm_loader/lib/src/types/tracer_ch_common.rs b/evm_loader/lib/src/types/tracer_ch_common.rs new file mode 100644 index 000000000..e49bf85a4 --- /dev/null +++ b/evm_loader/lib/src/types/tracer_ch_common.rs @@ -0,0 +1,116 @@ +use std::fmt; + +use clickhouse::Row; +use serde::{Deserialize, Serialize}; +use solana_sdk::{account::Account, pubkey::Pubkey}; +use thiserror::Error; + +pub const ROOT_BLOCK_DELAY: u8 = 100; + +#[derive(Error, Debug)] +pub enum ChError { + #[error("clickhouse: {}", .0)] + Db(#[from] clickhouse::error::Error), +} + +pub type ChResult = std::result::Result; + +pub enum SlotStatus { + #[allow(unused)] + Confirmed = 1, + #[allow(unused)] + Processed = 2, + Rooted = 3, +} + +#[derive(Debug, Row, serde::Deserialize, Clone)] +pub struct SlotParent { + pub slot: u64, + pub parent: Option, + pub status: u8, +} + +#[derive(Debug, Row, serde::Deserialize, Clone)] +pub struct SlotParentRooted { + pub slot: u64, + pub parent: Option, +} + +impl From for SlotParent { + fn from(slot_parent_rooted: SlotParentRooted) -> Self { + SlotParent { + slot: slot_parent_rooted.slot, + parent: slot_parent_rooted.parent, + status: SlotStatus::Rooted as u8, + } + } +} + +impl SlotParent { + pub fn is_rooted(&self) -> bool { + self.status == SlotStatus::Rooted as u8 + } +} + +#[derive(Row, serde::Deserialize, Clone)] +pub struct AccountRow { + pub owner: Vec, + pub lamports: u64, + pub executable: bool, + pub rent_epoch: u64, + pub data: Vec, + pub txn_signature: Vec>, +} + +impl fmt::Display for AccountRow { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "AccountRow {{\n owner: {},\n lamports: {},\n executable: {},\n rent_epoch: {},\n}}", + bs58::encode(&self.owner).into_string(), + self.lamports, + self.executable, + self.rent_epoch, + ) + } +} + +impl fmt::Debug for AccountRow { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Account") + .field("owner", &bs58::encode(&self.owner).into_string()) + .field("lamports", &self.lamports) + .field("executable", &self.executable) + .field("rent_epoch", &self.rent_epoch) + .finish() + } +} + +impl TryInto for AccountRow { + type Error = String; + + fn try_into(self) -> Result { + let owner = Pubkey::try_from(self.owner).map_err(|src| { + format!( + "Incorrect slice length ({}) while converting owner from: {src:?}", + src.len(), + ) + })?; + + Ok(Account { + lamports: self.lamports, + data: self.data, + owner, + rent_epoch: self.rent_epoch, + executable: self.executable, + }) + } +} + +#[derive(Row, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct EthSyncStatus { + starting_block: u64, + current_block: u64, + highest_block: u64, +} diff --git a/evm_loader/lib/src/types/tracer_ch_db.rs b/evm_loader/lib/src/types/tracer_ch_db.rs index 2b653c4f9..ab9e7a29b 100644 --- a/evm_loader/lib/src/types/tracer_ch_db.rs +++ b/evm_loader/lib/src/types/tracer_ch_db.rs @@ -1,7 +1,14 @@ -use crate::commands::get_neon_elf::get_elf_parameter; +use crate::{ + commands::get_neon_elf::get_elf_parameter, + types::tracer_ch_common::{AccountRow, ChError, SlotParent, ROOT_BLOCK_DELAY}, +}; + +use super::{ + tracer_ch_common::{ChResult, EthSyncStatus, SlotParentRooted}, + ChDbConfig, +}; -use super::ChDbConfig; -use clickhouse::{Client, Row}; +use clickhouse::Client; use log::{debug, info}; use rand::Rng; use solana_sdk::{ @@ -14,22 +21,9 @@ use std::{ Ord, Ordering::{Equal, Greater, Less}, }, - convert::TryFrom, - fmt, sync::Arc, time::Instant, }; -use thiserror::Error; - -const ROOT_BLOCK_DELAY: u8 = 100; - -#[derive(Error, Debug)] -pub enum ChError { - #[error("clickhouse: {}", .0)] - Db(#[from] clickhouse::error::Error), -} - -pub type ChResult = std::result::Result; #[allow(dead_code)] #[derive(Clone)] @@ -37,98 +31,6 @@ pub struct ClickHouseDb { pub client: Arc, } -pub enum SlotStatus { - #[allow(unused)] - Confirmed = 1, - #[allow(unused)] - Processed = 2, - Rooted = 3, -} - -#[derive(Debug, Row, serde::Deserialize, Clone)] -pub struct SlotParent { - pub slot: u64, - pub parent: Option, - pub status: u8, -} - -#[derive(Debug, Row, serde::Deserialize, Clone)] -pub struct SlotParentRooted { - pub slot: u64, - pub parent: Option, -} - -impl From for SlotParent { - fn from(slot_parent_rooted: SlotParentRooted) -> Self { - SlotParent { - slot: slot_parent_rooted.slot, - parent: slot_parent_rooted.parent, - status: SlotStatus::Rooted as u8, - } - } -} - -impl SlotParent { - fn is_rooted(&self) -> bool { - self.status == SlotStatus::Rooted as u8 - } -} - -#[derive(Row, serde::Deserialize, Clone)] -pub struct AccountRow { - pub owner: Vec, - pub lamports: u64, - pub executable: bool, - pub rent_epoch: u64, - pub data: Vec, - pub txn_signature: Vec>, -} - -impl fmt::Display for AccountRow { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "AccountRow {{\n owner: {},\n lamports: {},\n executable: {},\n rent_epoch: {},\n}}", - bs58::encode(&self.owner).into_string(), - self.lamports, - self.executable, - self.rent_epoch, - ) - } -} - -impl fmt::Debug for AccountRow { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Account") - .field("owner", &bs58::encode(&self.owner).into_string()) - .field("lamports", &self.lamports) - .field("executable", &self.executable) - .field("rent_epoch", &self.rent_epoch) - .finish() - } -} - -impl TryInto for AccountRow { - type Error = String; - - fn try_into(self) -> Result { - let owner = Pubkey::try_from(self.owner).map_err(|src| { - format!( - "Incorrect slice length ({}) while converting owner from: {src:?}", - src.len(), - ) - })?; - - Ok(Account { - lamports: self.lamports, - data: self.data, - owner, - rent_epoch: self.rent_epoch, - executable: self.executable, - }) - } -} - impl ClickHouseDb { pub fn new(config: &ChDbConfig) -> Self { let url_id = rand::thread_rng().gen_range(0..config.clickhouse_url.len()); @@ -620,6 +522,31 @@ impl ClickHouseDb { } } + pub async fn get_sync_status(&self) -> ChResult { + let query = r#"SELECT slot + FROM ( + (SELECT MIN(slot) as slot FROM events.notify_block_distributed) + UNION ALL + (SELECT MAX(slot) as slot FROM events.notify_block_distributed) + UNION ALL + (SELECT MAX(slot) as slot FROM events.notify_block_distributed) + ) + ORDER BY slot ASC + "#; + + let data = Self::row_opt(self.client.query(query).fetch_one::().await)?; + + match data { + Some(data) => Ok(data), + None => { + let err = clickhouse::error::Error::Custom(format!( + "get_sync_status: no data available", + )); + Err(ChError::Db(err)) + } + } + } + fn row_opt(result: clickhouse::error::Result) -> clickhouse::error::Result> { match result { Ok(row) => Ok(Some(row)), From e672d8d7a6d8fea5c95dc3c34aec8eec7baf9960 Mon Sep 17 00:00:00 2001 From: Denis Avvakumov Date: Wed, 13 Sep 2023 18:47:39 +0300 Subject: [PATCH 2/3] NDEV-2183: Implement get_slot_by_blockhash, improve get_sync_status --- evm_loader/lib/src/types/tracer_ch_common.rs | 17 ++++- evm_loader/lib/src/types/tracer_ch_db.rs | 74 +++++++++++++++----- 2 files changed, 73 insertions(+), 18 deletions(-) diff --git a/evm_loader/lib/src/types/tracer_ch_common.rs b/evm_loader/lib/src/types/tracer_ch_common.rs index e49bf85a4..2aa0c6428 100644 --- a/evm_loader/lib/src/types/tracer_ch_common.rs +++ b/evm_loader/lib/src/types/tracer_ch_common.rs @@ -107,9 +107,24 @@ impl TryInto for AccountRow { } } +pub enum EthSyncStatus { + Syncing(EthSyncing), + Synced, +} + +impl EthSyncStatus { + pub fn new(syncing_status: Option) -> Self { + if let Some(syncing_status) = syncing_status { + Self::Syncing(syncing_status) + } else { + Self::Synced + } + } +} + #[derive(Row, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] -pub struct EthSyncStatus { +pub struct EthSyncing { starting_block: u64, current_block: u64, highest_block: u64, diff --git a/evm_loader/lib/src/types/tracer_ch_db.rs b/evm_loader/lib/src/types/tracer_ch_db.rs index ab9e7a29b..ea7613e05 100644 --- a/evm_loader/lib/src/types/tracer_ch_db.rs +++ b/evm_loader/lib/src/types/tracer_ch_db.rs @@ -4,7 +4,7 @@ use crate::{ }; use super::{ - tracer_ch_common::{ChResult, EthSyncStatus, SlotParentRooted}, + tracer_ch_common::{ChResult, EthSyncStatus, EthSyncing, SlotParentRooted}, ChDbConfig, }; @@ -522,29 +522,69 @@ impl ClickHouseDb { } } - pub async fn get_sync_status(&self) -> ChResult { + pub async fn get_slot_by_blockhash(&self, blockhash: &str) -> ChResult> { let query = r#"SELECT slot - FROM ( - (SELECT MIN(slot) as slot FROM events.notify_block_distributed) - UNION ALL - (SELECT MAX(slot) as slot FROM events.notify_block_distributed) - UNION ALL - (SELECT MAX(slot) as slot FROM events.notify_block_distributed) + FROM events.notify_block_distributed + WHERE hash = ? + LIMIT 1 + "#; + + let slot = Self::row_opt( + self.client + .query(query) + .bind(blockhash) + .fetch_one::() + .await, + )?; + + Ok(slot) + } + + pub async fn get_sync_status(&self) -> ChResult { + let query_is_startup = r#"SELECT is_startup + FROM events.update_account_distributed + WHERE slot = ( + SELECT MAX(slot) + FROM events.update_account_distributed ) - ORDER BY slot ASC + LIMIT 1; "#; - let data = Self::row_opt(self.client.query(query).fetch_one::().await)?; + let is_startup = Self::row_opt( + self.client + .query(query_is_startup) + .fetch_one::() + .await, + )?; + + if let Some(is_startup) = is_startup { + if is_startup { + let query = r#"SELECT slot + FROM ( + (SELECT MIN(slot) as slot FROM events.notify_block_distributed) + UNION ALL + (SELECT MAX(slot) as slot FROM events.notify_block_distributed) + UNION ALL + (SELECT MAX(slot) as slot FROM events.notify_block_distributed) + ) + ORDER BY slot ASC + "#; - match data { - Some(data) => Ok(data), - None => { - let err = clickhouse::error::Error::Custom(format!( - "get_sync_status: no data available", - )); - Err(ChError::Db(err)) + let data = Self::row_opt(self.client.query(query).fetch_one::().await)?; + + match data { + Some(data) => return Ok(EthSyncStatus::new(Some(data))), + None => { + let err = clickhouse::error::Error::Custom(format!( + "get_sync_status: no data available", + )); + return Err(ChError::Db(err)); + } + } } } + + Ok(EthSyncStatus::new(None)) } fn row_opt(result: clickhouse::error::Result) -> clickhouse::error::Result> { From a440819500543d43a1add70ddbdf0897fef5a0f3 Mon Sep 17 00:00:00 2001 From: Denis Avvakumov Date: Thu, 14 Sep 2023 14:53:08 +0300 Subject: [PATCH 3/3] NDEV-2183: Make EthSyncing fields public --- evm_loader/lib/src/errors.rs | 2 +- evm_loader/lib/src/types/tracer_ch_common.rs | 6 ++-- evm_loader/lib/src/types/tracer_ch_db.rs | 32 ++++++++++---------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/evm_loader/lib/src/errors.rs b/evm_loader/lib/src/errors.rs index abe19885c..132e6e5b2 100644 --- a/evm_loader/lib/src/errors.rs +++ b/evm_loader/lib/src/errors.rs @@ -13,7 +13,7 @@ use solana_sdk::signer::SignerError as SolanaSignerError; use thiserror::Error; use crate::commands::init_environment::EnvironmentError; -use crate::types::ChError; +use crate::types::tracer_ch_common::ChError; /// Errors that may be returned by the neon-cli program. #[derive(Debug, Error)] diff --git a/evm_loader/lib/src/types/tracer_ch_common.rs b/evm_loader/lib/src/types/tracer_ch_common.rs index 2aa0c6428..c7c0e54f4 100644 --- a/evm_loader/lib/src/types/tracer_ch_common.rs +++ b/evm_loader/lib/src/types/tracer_ch_common.rs @@ -125,7 +125,7 @@ impl EthSyncStatus { #[derive(Row, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct EthSyncing { - starting_block: u64, - current_block: u64, - highest_block: u64, + pub starting_block: u64, + pub current_block: u64, + pub highest_block: u64, } diff --git a/evm_loader/lib/src/types/tracer_ch_db.rs b/evm_loader/lib/src/types/tracer_ch_db.rs index ea7613e05..2cf0a1b9d 100644 --- a/evm_loader/lib/src/types/tracer_ch_db.rs +++ b/evm_loader/lib/src/types/tracer_ch_db.rs @@ -522,7 +522,7 @@ impl ClickHouseDb { } } - pub async fn get_slot_by_blockhash(&self, blockhash: &str) -> ChResult> { + pub async fn get_slot_by_blockhash(&self, blockhash: &str) -> ChResult { let query = r#"SELECT slot FROM events.notify_block_distributed WHERE hash = ? @@ -537,7 +537,12 @@ impl ClickHouseDb { .await, )?; - Ok(slot) + match slot { + Some(slot) => Ok(slot), + None => Err(ChError::Db(clickhouse::error::Error::Custom( + "get_slot_by_blockhash: no data available".to_string(), + ))), + } } pub async fn get_sync_status(&self) -> ChResult { @@ -557,9 +562,8 @@ impl ClickHouseDb { .await, )?; - if let Some(is_startup) = is_startup { - if is_startup { - let query = r#"SELECT slot + if let Some(true) = is_startup { + let query = r#"SELECT slot FROM ( (SELECT MIN(slot) as slot FROM events.notify_block_distributed) UNION ALL @@ -570,18 +574,14 @@ impl ClickHouseDb { ORDER BY slot ASC "#; - let data = Self::row_opt(self.client.query(query).fetch_one::().await)?; + let data = Self::row_opt(self.client.query(query).fetch_one::().await)?; - match data { - Some(data) => return Ok(EthSyncStatus::new(Some(data))), - None => { - let err = clickhouse::error::Error::Custom(format!( - "get_sync_status: no data available", - )); - return Err(ChError::Db(err)); - } - } - } + return match data { + Some(data) => Ok(EthSyncStatus::new(Some(data))), + None => Err(ChError::Db(clickhouse::error::Error::Custom( + "get_sync_status: no data available".to_string(), + ))), + }; } Ok(EthSyncStatus::new(None))