Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NDEV-2183: Implement get_sync_status for eth_syncing #193

Merged
merged 3 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion evm_loader/lib/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use solana_sdk::signer::SignerError as SolanaSignerError;
use thiserror::Error;

use crate::commands::init_environment::EnvironmentError;
use crate::types::ChError;
use crate::types::tracer_ch_common::ChError;

/// Errors that may be returned by the neon-cli program.
#[derive(Debug, Error)]
Expand Down
3 changes: 2 additions & 1 deletion evm_loader/lib/src/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod request_models;
pub mod tracer_ch_common;
mod tracer_ch_db;

pub use evm_loader::types::Address;
Expand All @@ -7,7 +8,7 @@ use solana_sdk::pubkey::Pubkey;
use std::str::FromStr;
use tokio::runtime::Runtime;
use tokio::task::block_in_place;
pub use tracer_ch_db::{ChError, ChResult, ClickHouseDb as TracerDb};
pub use tracer_ch_db::ClickHouseDb as TracerDb;

use evm_loader::evm::tracing::TraceCallConfig;
use evm_loader::types::hexbytes::HexBytes;
Expand Down
131 changes: 131 additions & 0 deletions evm_loader/lib/src/types/tracer_ch_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::fmt;

use clickhouse::Row;
use serde::{Deserialize, Serialize};
use solana_sdk::{account::Account, pubkey::Pubkey};
use thiserror::Error;

pub const ROOT_BLOCK_DELAY: u8 = 100;

#[derive(Error, Debug)]
pub enum ChError {
#[error("clickhouse: {}", .0)]
Db(#[from] clickhouse::error::Error),
}

pub type ChResult<T> = std::result::Result<T, ChError>;

pub enum SlotStatus {
#[allow(unused)]
Confirmed = 1,
#[allow(unused)]
Processed = 2,
Rooted = 3,
}

#[derive(Debug, Row, serde::Deserialize, Clone)]
pub struct SlotParent {
pub slot: u64,
pub parent: Option<u64>,
pub status: u8,
}

#[derive(Debug, Row, serde::Deserialize, Clone)]
pub struct SlotParentRooted {
pub slot: u64,
pub parent: Option<u64>,
}

impl From<SlotParentRooted> for SlotParent {
fn from(slot_parent_rooted: SlotParentRooted) -> Self {
SlotParent {
slot: slot_parent_rooted.slot,
parent: slot_parent_rooted.parent,
status: SlotStatus::Rooted as u8,
}
}
}

impl SlotParent {
pub fn is_rooted(&self) -> bool {
self.status == SlotStatus::Rooted as u8
}
}

#[derive(Row, serde::Deserialize, Clone)]
pub struct AccountRow {
pub owner: Vec<u8>,
pub lamports: u64,
pub executable: bool,
pub rent_epoch: u64,
pub data: Vec<u8>,
pub txn_signature: Vec<Option<u8>>,
}

impl fmt::Display for AccountRow {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"AccountRow {{\n owner: {},\n lamports: {},\n executable: {},\n rent_epoch: {},\n}}",
bs58::encode(&self.owner).into_string(),
self.lamports,
self.executable,
self.rent_epoch,
)
}
}

impl fmt::Debug for AccountRow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Account")
.field("owner", &bs58::encode(&self.owner).into_string())
.field("lamports", &self.lamports)
.field("executable", &self.executable)
.field("rent_epoch", &self.rent_epoch)
.finish()
}
}

impl TryInto<Account> for AccountRow {
type Error = String;

fn try_into(self) -> Result<Account, Self::Error> {
let owner = Pubkey::try_from(self.owner).map_err(|src| {
format!(
"Incorrect slice length ({}) while converting owner from: {src:?}",
src.len(),
)
})?;

Ok(Account {
lamports: self.lamports,
data: self.data,
owner,
rent_epoch: self.rent_epoch,
executable: self.executable,
})
}
}

pub enum EthSyncStatus {
Syncing(EthSyncing),
Synced,
}

impl EthSyncStatus {
pub fn new(syncing_status: Option<EthSyncing>) -> Self {
if let Some(syncing_status) = syncing_status {
Self::Syncing(syncing_status)
} else {
Self::Synced
}
}
}

#[derive(Row, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct EthSyncing {
pub starting_block: u64,
pub current_block: u64,
pub highest_block: u64,
}
183 changes: 75 additions & 108 deletions evm_loader/lib/src/types/tracer_ch_db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use crate::commands::get_neon_elf::get_elf_parameter;
use crate::{
commands::get_neon_elf::get_elf_parameter,
types::tracer_ch_common::{AccountRow, ChError, SlotParent, ROOT_BLOCK_DELAY},
};

use super::{
tracer_ch_common::{ChResult, EthSyncStatus, EthSyncing, SlotParentRooted},
ChDbConfig,
};

use super::ChDbConfig;
use clickhouse::{Client, Row};
use clickhouse::Client;
use log::{debug, info};
use rand::Rng;
use solana_sdk::{
Expand All @@ -14,121 +21,16 @@ use std::{
Ord,
Ordering::{Equal, Greater, Less},
},
convert::TryFrom,
fmt,
sync::Arc,
time::Instant,
};
use thiserror::Error;

const ROOT_BLOCK_DELAY: u8 = 100;

#[derive(Error, Debug)]
pub enum ChError {
#[error("clickhouse: {}", .0)]
Db(#[from] clickhouse::error::Error),
}

pub type ChResult<T> = std::result::Result<T, ChError>;

#[allow(dead_code)]
#[derive(Clone)]
pub struct ClickHouseDb {
pub client: Arc<Client>,
}

pub enum SlotStatus {
#[allow(unused)]
Confirmed = 1,
#[allow(unused)]
Processed = 2,
Rooted = 3,
}

#[derive(Debug, Row, serde::Deserialize, Clone)]
pub struct SlotParent {
pub slot: u64,
pub parent: Option<u64>,
pub status: u8,
}

#[derive(Debug, Row, serde::Deserialize, Clone)]
pub struct SlotParentRooted {
pub slot: u64,
pub parent: Option<u64>,
}

impl From<SlotParentRooted> for SlotParent {
fn from(slot_parent_rooted: SlotParentRooted) -> Self {
SlotParent {
slot: slot_parent_rooted.slot,
parent: slot_parent_rooted.parent,
status: SlotStatus::Rooted as u8,
}
}
}

impl SlotParent {
fn is_rooted(&self) -> bool {
self.status == SlotStatus::Rooted as u8
}
}

#[derive(Row, serde::Deserialize, Clone)]
pub struct AccountRow {
pub owner: Vec<u8>,
pub lamports: u64,
pub executable: bool,
pub rent_epoch: u64,
pub data: Vec<u8>,
pub txn_signature: Vec<Option<u8>>,
}

impl fmt::Display for AccountRow {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"AccountRow {{\n owner: {},\n lamports: {},\n executable: {},\n rent_epoch: {},\n}}",
bs58::encode(&self.owner).into_string(),
self.lamports,
self.executable,
self.rent_epoch,
)
}
}

impl fmt::Debug for AccountRow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Account")
.field("owner", &bs58::encode(&self.owner).into_string())
.field("lamports", &self.lamports)
.field("executable", &self.executable)
.field("rent_epoch", &self.rent_epoch)
.finish()
}
}

impl TryInto<Account> for AccountRow {
type Error = String;

fn try_into(self) -> Result<Account, Self::Error> {
let owner = Pubkey::try_from(self.owner).map_err(|src| {
format!(
"Incorrect slice length ({}) while converting owner from: {src:?}",
src.len(),
)
})?;

Ok(Account {
lamports: self.lamports,
data: self.data,
owner,
rent_epoch: self.rent_epoch,
executable: self.executable,
})
}
}

impl ClickHouseDb {
pub fn new(config: &ChDbConfig) -> Self {
let url_id = rand::thread_rng().gen_range(0..config.clickhouse_url.len());
Expand Down Expand Up @@ -620,6 +522,71 @@ impl ClickHouseDb {
}
}

pub async fn get_slot_by_blockhash(&self, blockhash: &str) -> ChResult<u64> {
let query = r#"SELECT slot
FROM events.notify_block_distributed
WHERE hash = ?
LIMIT 1
"#;

let slot = Self::row_opt(
self.client
.query(query)
.bind(blockhash)
.fetch_one::<u64>()
.await,
)?;

match slot {
Some(slot) => Ok(slot),
None => Err(ChError::Db(clickhouse::error::Error::Custom(
"get_slot_by_blockhash: no data available".to_string(),
))),
}
andreisilviudragnea marked this conversation as resolved.
Show resolved Hide resolved
}

pub async fn get_sync_status(&self) -> ChResult<EthSyncStatus> {
let query_is_startup = r#"SELECT is_startup
FROM events.update_account_distributed
WHERE slot = (
SELECT MAX(slot)
FROM events.update_account_distributed
)
LIMIT 1;
"#;

let is_startup = Self::row_opt(
self.client
.query(query_is_startup)
.fetch_one::<bool>()
.await,
)?;

if let Some(true) = is_startup {
let query = r#"SELECT slot
FROM (
(SELECT MIN(slot) as slot FROM events.notify_block_distributed)
UNION ALL
(SELECT MAX(slot) as slot FROM events.notify_block_distributed)
UNION ALL
(SELECT MAX(slot) as slot FROM events.notify_block_distributed)
)
ORDER BY slot ASC
"#;

let data = Self::row_opt(self.client.query(query).fetch_one::<EthSyncing>().await)?;

return match data {
Some(data) => Ok(EthSyncStatus::new(Some(data))),
None => Err(ChError::Db(clickhouse::error::Error::Custom(
"get_sync_status: no data available".to_string(),
))),
};
}

Ok(EthSyncStatus::new(None))
}

fn row_opt<T>(result: clickhouse::error::Result<T>) -> clickhouse::error::Result<Option<T>> {
match result {
Ok(row) => Ok(Some(row)),
Expand Down