From 9e717ef257c4446465271e2b69bd9f537157f0e9 Mon Sep 17 00:00:00 2001 From: neithanmo Date: Fri, 29 Mar 2024 15:33:30 -0600 Subject: [PATCH 1/4] feat: Redifine checksums as a global as it is meant to be initialized once and never changes --- Cargo.lock | 1 + Cargo.toml | 1 + src/database.rs | 29 ++++++++--------------------- src/indexer/mod.rs | 11 +---------- src/lib.rs | 12 ++++++++++++ 5 files changed, 23 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 82e2db0..24462a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4057,6 +4057,7 @@ dependencies = [ "metrics", "metrics-exporter-prometheus", "namada_sdk", + "once_cell", "opentelemetry", "opentelemetry-jaeger", "opentelemetry_api", diff --git a/Cargo.toml b/Cargo.toml index d33f552..0a34498 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ tendermint-rpc = { version = "0.35.0", features = ["http-client"] } tendermint-proto = "0.35.0" clap = { version = "4.4.2", features = ["derive", "env"] } ureq = "2.9.1" +once_cell = "1.19.0" [dev-dependencies] criterion = { version = "0.5.1", features = [ diff --git a/src/database.rs b/src/database.rs index eef8974..d6eac7f 100644 --- a/src/database.rs +++ b/src/database.rs @@ -28,7 +28,6 @@ use namada_sdk::{ use sqlx::postgres::{PgPool, PgPoolOptions, PgRow as Row}; use sqlx::Row as TRow; use sqlx::{query, QueryBuilder, Transaction}; -use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tendermint::block::Block; @@ -39,7 +38,7 @@ use tendermint_rpc::endpoint::block_results; use tracing::{debug, info, instrument}; use crate::{ - DB_SAVE_BLOCK_COUNTER, DB_SAVE_BLOCK_DURATION, DB_SAVE_COMMIT_SIG_DURATION, + CHECKSUMS, DB_SAVE_BLOCK_COUNTER, DB_SAVE_BLOCK_DURATION, DB_SAVE_COMMIT_SIG_DURATION, DB_SAVE_EVDS_DURATION, DB_SAVE_TXS_DURATION, INDEXER_LAST_SAVE_BLOCK_HEIGHT, MASP_ADDR, }; @@ -238,11 +237,10 @@ impl Database { /// Inner implementation that uses a postgres-transaction /// to ensure database coherence. - #[instrument(skip(block, block_results, checksums_map, sqlx_tx))] + #[instrument(skip(block, block_results, sqlx_tx))] async fn save_block_impl<'a>( block: &Block, block_results: &block_results::Response, - checksums_map: &HashMap, sqlx_tx: &mut Transaction<'a, sqlx::Postgres>, network: &str, ) -> Result<(), Error> { @@ -338,7 +336,6 @@ impl Database { block_id, block.header.height.value(), block_results, - checksums_map, sqlx_tx, network, ) @@ -348,12 +345,11 @@ impl Database { } /// Save a block and commit database - #[instrument(skip(self, block, block_results, checksums_map))] + #[instrument(skip(self, block, block_results))] pub async fn save_block( &self, block: &Block, block_results: &block_results::Response, - checksums_map: &HashMap, ) -> Result<(), Error> { let instant = tokio::time::Instant::now(); // Lets use postgres transaction internally for 2 reasons: @@ -365,14 +361,7 @@ impl Database { // succeeded. let mut sqlx_tx = self.transaction().await?; - Self::save_block_impl( - block, - block_results, - checksums_map, - &mut sqlx_tx, - self.network.as_str(), - ) - .await?; + Self::save_block_impl(block, block_results, &mut sqlx_tx, self.network.as_str()).await?; let res = sqlx_tx.commit().await.map_err(Error::from); @@ -501,15 +490,14 @@ impl Database { /// It is up to the caller to commit the operation. /// this method is meant to be used when caller is saving /// many blocks, and can commit after it. - #[instrument(skip(block, block_results, checksums_map, sqlx_tx, network))] + #[instrument(skip(block, block_results, sqlx_tx, network))] pub async fn save_block_tx<'a>( block: &Block, block_results: &block_results::Response, - checksums_map: &HashMap, sqlx_tx: &mut Transaction<'a, sqlx::Postgres>, network: &str, ) -> Result<(), Error> { - Self::save_block_impl(block, block_results, checksums_map, sqlx_tx, network).await + Self::save_block_impl(block, block_results, sqlx_tx, network).await } /// Save all the evidences in the list, it is up to the caller to @@ -621,13 +609,12 @@ impl Database { /// Save all the transactions in txs, it is up to the caller to /// call sqlx_tx.commit().await?; for the changes to take place in /// database. - #[instrument(skip(txs, block_id, sqlx_tx, checksums_map, block_results, network))] + #[instrument(skip(txs, block_id, sqlx_tx, block_results, network))] async fn save_transactions<'a>( txs: &[Vec], block_id: &[u8], block_height: u64, block_results: &block_results::Response, - checksums_map: &HashMap, sqlx_tx: &mut Transaction<'a, sqlx::Postgres>, network: &str, ) -> Result<(), Error> { @@ -724,7 +711,7 @@ impl Database { let code_hex = hex::encode(code.as_slice()); let unknown_type = "unknown".to_string(); - let type_tx = checksums_map.get(&code_hex).unwrap_or(&unknown_type); + let type_tx = CHECKSUMS.get(&code_hex).unwrap_or(&unknown_type); // decode tx_transfer, tx_bond and tx_unbound to store the decoded data in their tables // if the transaction has failed don't try to decode because the changes are not included and the data might not be correct diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index c615450..2673c22 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,5 +1,4 @@ use crate::config::IndexerConfig; -use crate::utils::load_checksums; use futures::stream::StreamExt; use futures_util::pin_mut; use futures_util::Stream; @@ -194,14 +193,6 @@ pub async fn start_indexing( // check if indexes has been created in the database let has_indexes = utils::has_indexes(&db).await?; - /******************** - * - * Load checksums - * - ********************/ - - let checksums_map = load_checksums()?; - /******************** * * Init RPC @@ -232,7 +223,7 @@ pub async fn start_indexing( // Block consumer that stores block into the database while let Some(block) = rx.recv().await { // block is now the block info and the block results - if let Err(e) = db.save_block(&block.0, &block.1, &checksums_map).await { + if let Err(e) = db.save_block(&block.0, &block.1).await { // shutdown producer task shutdown.store(true, Ordering::Relaxed); tracing::error!("Closing block producer task due to an error saving last block: {e}"); diff --git a/src/lib.rs b/src/lib.rs index e4546d7..03dd9b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,10 @@ pub use indexer::start_indexing; pub use server::{create_server, start_server, BlockInfo}; pub use telemetry::{get_subscriber, init_subscriber, setup_logging}; +use std::collections::HashMap; + +use once_cell::sync::Lazy; + pub const INDEXER_GET_BLOCK_DURATION: &str = "indexer_get_block_duration"; const DB_SAVE_BLOCK_COUNTER: &str = "db_save_block_count"; const DB_SAVE_BLOCK_DURATION: &str = "db_save_block_duration"; @@ -28,3 +32,11 @@ const INDEXER_LAST_SAVE_BLOCK_HEIGHT: &str = "indexer_last_save_block_height"; const INDEXER_LAST_GET_BLOCK_HEIGHT: &str = "indexer_last_get_block_height"; pub const MASP_ADDR: &str = "tnam1pcqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqzmefah"; + +// Checksums for the different transaction types, +// stored as a global for easy access from anywhere. +pub(crate) static CHECKSUMS: Lazy> = + // Lazylly load the checksums from the env/file + // this helps reducing the overhead of passing checksums to database + // functions for data that is initialized once and never changes. + Lazy::new(|| utils::load_checksums().expect("Failed to load checksums")); From eba1cee5069871fd3b95c0e7264c711c61fec55b Mon Sep 17 00:00:00 2001 From: neithanmo Date: Fri, 29 Mar 2024 15:43:43 -0600 Subject: [PATCH 2/4] update test --- tests/save_block.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/save_block.rs b/tests/save_block.rs index ef2541f..9c72f36 100644 --- a/tests/save_block.rs +++ b/tests/save_block.rs @@ -2,7 +2,6 @@ mod utils; #[cfg(test)] mod save_block { - use namadexer::utils::load_checksums; use std::fs; use tendermint::block::Block; use tendermint_rpc::endpoint::block_results; @@ -18,8 +17,6 @@ mod save_block { // now create a fresh database for tests let db = create_test_db(helper_db.pool(), TESTING_DB_NAME).await; - let checksums_map = load_checksums().unwrap(); - let data = fs::read_to_string("./tests/blocks_vector.json").unwrap(); let blocks: Vec = serde_json::from_str(&data).unwrap(); let data = fs::read_to_string("./tests/block_results_vector.json").unwrap(); @@ -28,9 +25,7 @@ mod save_block { db.create_tables().await.unwrap(); for i in 0..blocks.len() { - db.save_block(&blocks[i], &block_results[i], &checksums_map) - .await - .unwrap(); + db.save_block(&blocks[i], &block_results[i]).await.unwrap(); } db.create_indexes() From 1c4ac2f1a693a8f3ab9d7da4c7a4a0b1573f76c6 Mon Sep 17 00:00:00 2001 From: neithanmo Date: Fri, 29 Mar 2024 15:44:07 -0600 Subject: [PATCH 3/4] update generator for testing --- examples/generate.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/generate.rs b/examples/generate.rs index fe489ab..e8a8457 100644 --- a/examples/generate.rs +++ b/examples/generate.rs @@ -1,8 +1,8 @@ // Generate tests vectors blocks -use namada_sdk::tendermint::block::Height; -use namada_sdk::tendermint_rpc::{Client, HttpClient}; use std::fs::File; use std::io::Write; +use tendermint::block::Height; +use tendermint_rpc::{self, Client, HttpClient}; const URL: &str = "http://194.163.180.253:26657"; const CURRENT_HEIGHT: u32 = 1; From 269e23f8b67961732fadd2d0e965df8aec40e487 Mon Sep 17 00:00:00 2001 From: neithanmo Date: Fri, 29 Mar 2024 16:02:31 -0600 Subject: [PATCH 4/4] use more idiomatic rust --- src/database.rs | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/database.rs b/src/database.rs index d6eac7f..eaca028 100644 --- a/src/database.rs +++ b/src/database.rs @@ -674,25 +674,27 @@ impl Database { if let TxType::Decrypted(..) = tx.header().tx_type { // For unknown reason the header has to be updated before hashing it for its id (https://github.com/Zondax/namadexer/issues/23) hash_id = tx.clone().update_header(TxType::Raw).header_hash().to_vec(); - - // Look for the return code in the block results - let end_events = block_results.end_block_events.clone().unwrap(); // Safe to use unwrap because if it is not present then something is broken. - - // Look for the reurn code associated to the tx - for event in end_events { - for attr in event.attributes.iter() { - // We look to confirm hash of transaction - if attr.key == "hash" - && attr.value.to_ascii_lowercase() == hex::encode(&hash_id) - { - // Now we look for the return code - for attr in event.attributes.iter() { - if attr.key == "code" { - // using unwrap here is ok because we assume it is always going to be a number unless there is a bug in the node - return_code = Some(attr.value.parse().unwrap()); - } - } - } + let hash_id_str = hex::encode(&hash_id); + + // Safe to use unwrap because if it is not present then something is broken. + let end_events = block_results.end_block_events.clone().unwrap(); + + // filter to get the matching event for hash_id + let matching_event = end_events.iter().find(|event| { + event.attributes.iter().any(|attr| { + attr.key == "hash" && attr.value.to_ascii_lowercase() == hash_id_str + }) + }); + + // now for the event get its attribute and parse the return code + if let Some(event) = matching_event { + // Now, look for the "code" attribute in the found event + if let Some(code_attr) = event.attributes.iter().find(|attr| attr.key == "code") + { + // Parse the code value. + // It could be possible to ignore the error by converting the result + // to an Option but it is better to fail if the value is not a number. + return_code = Some(code_attr.value.parse()?); } }