Skip to content

Commit

Permalink
Add 'write version' to EmulateRequestModel (#212)
Browse files Browse the repository at this point in the history
* Add 'write version' to EmulateRequestModel
  • Loading branch information
stanislav-tkach authored Oct 12, 2023
1 parent 641bc1d commit a04990f
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 35 deletions.
10 changes: 6 additions & 4 deletions evm_loader/api/src/api_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ use std::sync::Arc;
pub async fn build_rpc_client(
state: &NeonApiState,
slot: Option<u64>,
tx_index_in_block: Option<u64>,
) -> Result<Arc<dyn rpc::Rpc>, NeonError> {
if let Some(slot) = slot {
return build_call_db_client(state, slot).await;
build_call_db_client(state, slot, tx_index_in_block).await
} else {
Ok(state.rpc_client.clone())
}

Ok(state.rpc_client.clone())
}

pub async fn build_call_db_client(
state: &NeonApiState,
slot: u64,
tx_index_in_block: Option<u64>,
) -> Result<Arc<dyn rpc::Rpc>, NeonError> {
Ok(Arc::new(
CallDbClient::new(state.tracer_db.clone(), slot).await?,
CallDbClient::new(state.tracer_db.clone(), slot, tx_index_in_block).await?,
))
}
8 changes: 7 additions & 1 deletion evm_loader/api/src/api_server/handlers/emulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ pub async fn emulate(
) -> impl Responder {
let tx = emulate_request.tx_params.into();

let rpc_client = match api_context::build_rpc_client(&state, emulate_request.slot).await {
let rpc_client = match api_context::build_rpc_client(
&state,
emulate_request.slot,
emulate_request.tx_index_in_block,
)
.await
{
Ok(rpc_client) => rpc_client,
Err(e) => return process_error(StatusCode::BAD_REQUEST, &e),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub async fn get_ether_account_data(
request_id: RequestId,
Query(req_params): Query<GetEtherRequest>,
) -> impl Responder {
let rpc_client = match api_context::build_rpc_client(&state, req_params.slot).await {
let rpc_client = match api_context::build_rpc_client(&state, req_params.slot, None).await {
Ok(rpc_client) => rpc_client,
Err(e) => return process_error(StatusCode::BAD_REQUEST, &e),
};
Expand Down
2 changes: 1 addition & 1 deletion evm_loader/api/src/api_server/handlers/get_storage_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub async fn get_storage_at(
request_id: RequestId,
Query(req_params): Query<GetStorageAtRequest>,
) -> impl Responder {
let rpc_client = match api_context::build_rpc_client(&state, req_params.slot).await {
let rpc_client = match api_context::build_rpc_client(&state, req_params.slot, None).await {
Ok(rpc_client) => rpc_client,
Err(e) => return process_error(StatusCode::BAD_REQUEST, &e),
};
Expand Down
15 changes: 10 additions & 5 deletions evm_loader/api/src/api_server/handlers/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ pub async fn trace(
) -> impl Responder {
let tx = trace_request.emulate_request.tx_params.into();

let rpc_client =
match api_context::build_rpc_client(&state, trace_request.emulate_request.slot).await {
Ok(rpc_client) => rpc_client,
Err(e) => return process_error(StatusCode::BAD_REQUEST, &e),
};
let rpc_client = match api_context::build_rpc_client(
&state,
trace_request.emulate_request.slot,
trace_request.emulate_request.tx_index_in_block,
)
.await
{
Ok(rpc_client) => rpc_client,
Err(e) => return process_error(StatusCode::BAD_REQUEST, &e),
};

let context = Context::new(&*rpc_client, &state.config);

Expand Down
1 change: 1 addition & 0 deletions evm_loader/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async fn run<'a>(options: &'a ArgMatches<'a>) -> NeonCliResult {
CallDbClient::new(
TracerDb::new(config.db_config.as_ref().expect("db-config not found")),
slot,
None,
)
.await?,
)
Expand Down
21 changes: 15 additions & 6 deletions evm_loader/lib/src/rpc/db_call_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ use std::any::Any;

pub struct CallDbClient {
tracer_db: TracerDb,
pub slot: u64,
slot: u64,
tx_index_in_block: Option<u64>,
}

impl CallDbClient {
pub async fn new(tracer_db: TracerDb, slot: u64) -> Result<Self, NeonError> {
pub async fn new(
tracer_db: TracerDb,
slot: u64,
tx_index_in_block: Option<u64>,
) -> Result<Self, NeonError> {
let earliest_rooted_slot = tracer_db
.get_earliest_rooted_slot()
.await
Expand All @@ -37,7 +42,11 @@ impl CallDbClient {
return Err(NeonError::EarlySlot(slot, earliest_rooted_slot));
}

Ok(Self { tracer_db, slot })
Ok(Self {
tracer_db,
slot,
tx_index_in_block,
})
}
}

Expand All @@ -60,7 +69,7 @@ impl Rpc for CallDbClient {

async fn get_account(&self, key: &Pubkey) -> ClientResult<Account> {
self.tracer_db
.get_account_at(key, self.slot)
.get_account_at(key, self.slot, self.tx_index_in_block)
.await
.map_err(|e| e!("load account error", key, e))?
.ok_or_else(|| e!("account not found", key))
Expand All @@ -73,7 +82,7 @@ impl Rpc for CallDbClient {
) -> RpcResult<Option<Account>> {
let account = self
.tracer_db
.get_account_at(key, self.slot)
.get_account_at(key, self.slot, self.tx_index_in_block)
.await
.map_err(|e| e!("load account error", key, e))?;

Expand All @@ -95,7 +104,7 @@ impl Rpc for CallDbClient {
for key in pubkeys {
let account = self
.tracer_db
.get_account_at(key, self.slot)
.get_account_at(key, self.slot, self.tx_index_in_block)
.await
.map_err(|e| e!("load account error", key, e))?;
result.push(account);
Expand Down
1 change: 1 addition & 0 deletions evm_loader/lib/src/types/request_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub struct EmulateRequestModel {
#[serde(flatten)]
pub emulation_params: EmulationParamsRequestModel,
pub slot: Option<u64>,
pub tx_index_in_block: Option<u64>,
}

#[derive(Deserialize, Serialize, Debug, Default)]
Expand Down
100 changes: 83 additions & 17 deletions evm_loader/lib/src/types/tracer_ch_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{
};

use clickhouse::Client;
use log::{debug, info};
use log::{debug, error, info};
use rand::Rng;
use solana_sdk::{
account::Account,
Expand Down Expand Up @@ -220,11 +220,32 @@ impl ClickHouseDb {
Ok(slot_opt)
}

#[allow(clippy::too_many_lines)]
pub async fn get_account_at(&self, pubkey: &Pubkey, slot: u64) -> ChResult<Option<Account>> {
info!("get_account_at {{ pubkey: {pubkey}, slot: {slot} }}");
pub async fn get_account_at(
&self,
pubkey: &Pubkey,
slot: u64,
tx_index_in_block: Option<u64>,
) -> ChResult<Option<Account>> {
if let Some(tx_index_in_block) = tx_index_in_block {
if let Some(account) = self
.get_account_at_index_in_block(pubkey, slot, tx_index_in_block)
.await?
{
return Ok(Some(account));
}
}

self.get_account_at_slot(pubkey, slot).await
}

async fn get_account_at_slot(
&self,
pubkey: &Pubkey,
slot: u64,
) -> Result<Option<Account>, ChError> {
info!("get_account_at_slot {{ pubkey: {pubkey}, slot: {slot} }}");
let (first, mut branch) = self.get_branch_slots(Some(slot)).await.map_err(|e| {
println!("get_branch_slots error: {:?}", e);
error!("get_branch_slots error: {:?}", e);
e
})?;

Expand All @@ -234,7 +255,7 @@ impl ClickHouseDb {
.get_account_rooted_slot(&pubkey_str, first)
.await
.map_err(|e| {
println!("get_account_rooted_slot error: {:?}", e);
error!("get_account_rooted_slot error: {:?}", e);
e
})?
{
Expand Down Expand Up @@ -263,12 +284,12 @@ impl ClickHouseDb {
.await,
)
.map_err(|e| {
println!("get_account_at error: {e}");
error!("get_account_at_slot error: {e}");
ChError::Db(e)
})?;
let execution_time = Instant::now().duration_since(time_start);
info!(
"get_account_at {{ pubkey: {pubkey}, slot: {slot} }} sql(1) returned {row:?}, time: {} sec",
"get_account_at_slot {{ pubkey: {pubkey}, slot: {slot} }} sql(1) returned {row:?}, time: {} sec",
execution_time.as_secs_f64()
);

Expand All @@ -285,19 +306,64 @@ impl ClickHouseDb {
);
}

let result = if let Some(acc) = row {
acc.try_into()
.map(Some)
.map_err(|err| ChError::Db(clickhouse::error::Error::Custom(err)))
} else {
Ok(None)
};
let result = row
.map(|a| a.try_into())
.transpose()
.map_err(|e| ChError::Db(clickhouse::error::Error::Custom(e)));

info!("get_account_at {{ pubkey: {pubkey}, slot: {slot} }} -> {result:?}");
info!("get_account_at_slot {{ pubkey: {pubkey}, slot: {slot} }} -> {result:?}");

result
}

async fn get_account_at_index_in_block(
&self,
pubkey: &Pubkey,
slot: u64,
tx_index_in_block: u64,
) -> ChResult<Option<Account>> {
info!(
"get_account_at_index_in_block {{ pubkey: {pubkey}, slot: {slot}, tx_index_in_block: {tx_index_in_block} }}"
);

let query = r#"
SELECT owner, lamports, executable, rent_epoch, data, txn_signature
FROM events.update_account_distributed
WHERE pubkey = ?
AND slot = ?
AND write_version <= ?
ORDER BY write_version DESC
LIMIT 1
"#;

let time_start = Instant::now();

let account = Self::row_opt(
self.client
.query(query)
.bind(format!("{:?}", pubkey.to_bytes()))
.bind(slot)
.bind(tx_index_in_block)
.fetch_one::<AccountRow>()
.await,
)
.map_err(|e| {
error!("get_account_at_index_in_block error: {e}");
ChError::Db(e)
})?
.map(|a| a.try_into())
.transpose()
.map_err(|e| ChError::Db(clickhouse::error::Error::Custom(e)))?;

let execution_time = Instant::now().duration_since(time_start);
info!(
"get_account_at_index_in_block {{ pubkey: {pubkey}, slot: {slot}, tx_index_in_block: {tx_index_in_block} }} sql(1) returned {account:?}, time: {} sec",
execution_time.as_secs_f64()
);

Ok(account)
}

async fn get_older_account_row_at(
&self,
pubkey: &str,
Expand Down Expand Up @@ -470,7 +536,7 @@ impl ClickHouseDb {

// If not found, get closest account state in one of previous slots
if let Some(parent) = slot.parent {
self.get_account_at(pubkey, parent).await
self.get_account_at(pubkey, parent, None).await
} else {
Ok(None)
}
Expand Down

0 comments on commit a04990f

Please sign in to comment.