Skip to content

Commit

Permalink
Update the 'get_account_at' function logic accoring to review suggest…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
stanislav-tkach committed Oct 12, 2023
1 parent 969f457 commit 8adfb4d
Showing 1 changed file with 92 additions and 44 deletions.
136 changes: 92 additions & 44 deletions evm_loader/lib/src/types/tracer_ch_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{
};

use clickhouse::Client;
use log::{debug, info};
use log::{debug, error, info};
use rand::Rng;
use solana_sdk::{
account::Account,
Expand Down Expand Up @@ -220,18 +220,32 @@ impl ClickHouseDb {
Ok(slot_opt)
}

#[allow(clippy::too_many_lines)]
pub async fn get_account_at(
&self,
pubkey: &Pubkey,
slot: u64,
tx_index_in_block: Option<u64>,
) -> ChResult<Option<Account>> {
info!(
"get_account_at {{ pubkey: {pubkey}, slot: {slot}, tx index in block: {tx_index_in_block:?} }}"
);
if let Some(tx_index_in_block) = tx_index_in_block {
if let Some(account) = self
.get_account_at_index_in_block(pubkey, slot, tx_index_in_block)
.await?
{
return Ok(Some(account));
}
}

self.get_account_at_slot(pubkey, slot).await
}

async fn get_account_at_slot(
&self,
pubkey: &Pubkey,
slot: u64,
) -> Result<Option<Account>, ChError> {
info!("get_account_at_slot {{ pubkey: {pubkey}, slot: {slot} }}");
let (first, mut branch) = self.get_branch_slots(Some(slot)).await.map_err(|e| {
println!("get_branch_slots error: {:?}", e);
error!("get_branch_slots error: {:?}", e);
e
})?;

Expand All @@ -241,7 +255,7 @@ impl ClickHouseDb {
.get_account_rooted_slot(&pubkey_str, first)
.await
.map_err(|e| {
println!("get_account_rooted_slot error: {:?}", e);
error!("get_account_rooted_slot error: {:?}", e);
e
})?
{
Expand All @@ -251,42 +265,31 @@ impl ClickHouseDb {
let mut row = if branch.is_empty() {
None
} else {
let query = if tx_index_in_block.is_some() {
r#"
SELECT owner, lamports, executable, rent_epoch, data, txn_signature
FROM events.update_account_distributed
WHERE pubkey = ?
AND write_version = ?
AND slot IN ?
ORDER BY slot DESC, write_version DESC
LIMIT 1
"#
} else {
r#"
SELECT owner, lamports, executable, rent_epoch, data, txn_signature
FROM events.update_account_distributed
WHERE pubkey = ?
AND slot IN ?
ORDER BY slot DESC, write_version DESC
LIMIT 1
"#
};
let query = r#"
SELECT owner, lamports, executable, rent_epoch, data, txn_signature
FROM events.update_account_distributed
WHERE pubkey = ?
AND slot IN ?
ORDER BY pubkey, slot DESC, write_version DESC
LIMIT 1
"#;

let time_start = Instant::now();
let row = Self::row_opt({
let mut row = self.client.query(query).bind(pubkey_str.clone());
if let Some(tx_index_in_block) = tx_index_in_block {
row = row.bind(tx_index_in_block);
}
row.bind(branch.as_slice()).fetch_one::<AccountRow>().await
})
let row = Self::row_opt(
self.client
.query(query)
.bind(pubkey_str.clone())
.bind(branch.as_slice())
.fetch_one::<AccountRow>()
.await,
)
.map_err(|e| {
println!("get_account_at error: {e}");
error!("get_account_at_slot error: {e}");
ChError::Db(e)
})?;
let execution_time = Instant::now().duration_since(time_start);
info!(
"get_account_at {{ pubkey: {pubkey}, slot: {slot} }} sql(1) returned {row:?}, time: {} sec",
"get_account_at_slot {{ pubkey: {pubkey}, slot: {slot} }} sql(1) returned {row:?}, time: {} sec",
execution_time.as_secs_f64()
);

Expand All @@ -303,19 +306,64 @@ impl ClickHouseDb {
);
}

let result = if let Some(acc) = row {
acc.try_into()
.map(Some)
.map_err(|err| ChError::Db(clickhouse::error::Error::Custom(err)))
} else {
Ok(None)
};
let result = row
.map(|a| a.try_into())
.transpose()
.map_err(|e| ChError::Db(clickhouse::error::Error::Custom(e)));

info!("get_account_at {{ pubkey: {pubkey}, slot: {slot} }} -> {result:?}");
info!("get_account_at_slot {{ pubkey: {pubkey}, slot: {slot} }} -> {result:?}");

result
}

async fn get_account_at_index_in_block(
&self,
pubkey: &Pubkey,
slot: u64,
tx_index_in_block: u64,
) -> ChResult<Option<Account>> {
info!(
"get_account_at_index_in_block {{ pubkey: {pubkey}, slot: {slot}, tx_index_in_block: {tx_index_in_block} }}"
);

let query = r#"
SELECT owner, lamports, executable, rent_epoch, data, txn_signature
FROM events.update_account_distributed
WHERE pubkey = ?
AND slot = ?
AND write_version <= ?
ORDER BY write_version DESC
LIMIT 1
"#;

let time_start = Instant::now();

let account = Self::row_opt(
self.client
.query(query)
.bind(format!("{:?}", pubkey.to_bytes()))
.bind(slot)
.bind(tx_index_in_block)
.fetch_one::<AccountRow>()
.await,
)
.map_err(|e| {
error!("get_account_at_index_in_block error: {e}");
ChError::Db(e)
})?
.map(|a| a.try_into())
.transpose()
.map_err(|e| ChError::Db(clickhouse::error::Error::Custom(e)))?;

let execution_time = Instant::now().duration_since(time_start);
info!(
"get_account_at_index_in_block {{ pubkey: {pubkey}, slot: {slot}, tx_index_in_block: {tx_index_in_block} }} sql(1) returned {account:?}, time: {} sec",
execution_time.as_secs_f64()
);

Ok(account)
}

async fn get_older_account_row_at(
&self,
pubkey: &str,
Expand Down

0 comments on commit 8adfb4d

Please sign in to comment.