Skip to content
This repository has been archived by the owner on Jun 19, 2024. It is now read-only.

Feat/checksums once_cell #179

Merged
merged 4 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tendermint-rpc = { version = "0.35.0", features = ["http-client"] }
tendermint-proto = "0.35.0"
clap = { version = "4.4.2", features = ["derive", "env"] }
ureq = "2.9.1"
once_cell = "1.19.0"

[dev-dependencies]
criterion = { version = "0.5.1", features = [
Expand Down
4 changes: 2 additions & 2 deletions examples/generate.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Generate tests vectors blocks
use namada_sdk::tendermint::block::Height;
use namada_sdk::tendermint_rpc::{Client, HttpClient};
use std::fs::File;
use std::io::Write;
use tendermint::block::Height;
use tendermint_rpc::{self, Client, HttpClient};

const URL: &str = "http://194.163.180.253:26657";
const CURRENT_HEIGHT: u32 = 1;
Expand Down
69 changes: 29 additions & 40 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use namada_sdk::{
use sqlx::postgres::{PgPool, PgPoolOptions, PgRow as Row};
use sqlx::Row as TRow;
use sqlx::{query, QueryBuilder, Transaction};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tendermint::block::Block;
Expand All @@ -39,7 +38,7 @@ use tendermint_rpc::endpoint::block_results;
use tracing::{debug, info, instrument};

use crate::{
DB_SAVE_BLOCK_COUNTER, DB_SAVE_BLOCK_DURATION, DB_SAVE_COMMIT_SIG_DURATION,
CHECKSUMS, DB_SAVE_BLOCK_COUNTER, DB_SAVE_BLOCK_DURATION, DB_SAVE_COMMIT_SIG_DURATION,
DB_SAVE_EVDS_DURATION, DB_SAVE_TXS_DURATION, INDEXER_LAST_SAVE_BLOCK_HEIGHT, MASP_ADDR,
};

Expand Down Expand Up @@ -238,11 +237,10 @@ impl Database {

/// Inner implementation that uses a postgres-transaction
/// to ensure database coherence.
#[instrument(skip(block, block_results, checksums_map, sqlx_tx))]
#[instrument(skip(block, block_results, sqlx_tx))]
async fn save_block_impl<'a>(
block: &Block,
block_results: &block_results::Response,
checksums_map: &HashMap<String, String>,
sqlx_tx: &mut Transaction<'a, sqlx::Postgres>,
network: &str,
) -> Result<(), Error> {
Expand Down Expand Up @@ -338,7 +336,6 @@ impl Database {
block_id,
block.header.height.value(),
block_results,
checksums_map,
sqlx_tx,
network,
)
Expand All @@ -348,12 +345,11 @@ impl Database {
}

/// Save a block and commit database
#[instrument(skip(self, block, block_results, checksums_map))]
#[instrument(skip(self, block, block_results))]
pub async fn save_block(
&self,
block: &Block,
block_results: &block_results::Response,
checksums_map: &HashMap<String, String>,
) -> Result<(), Error> {
let instant = tokio::time::Instant::now();
// Lets use postgres transaction internally for 2 reasons:
Expand All @@ -365,14 +361,7 @@ impl Database {
// succeeded.
let mut sqlx_tx = self.transaction().await?;

Self::save_block_impl(
block,
block_results,
checksums_map,
&mut sqlx_tx,
self.network.as_str(),
)
.await?;
Self::save_block_impl(block, block_results, &mut sqlx_tx, self.network.as_str()).await?;

let res = sqlx_tx.commit().await.map_err(Error::from);

Expand Down Expand Up @@ -501,15 +490,14 @@ impl Database {
/// It is up to the caller to commit the operation.
/// this method is meant to be used when caller is saving
/// many blocks, and can commit after it.
#[instrument(skip(block, block_results, checksums_map, sqlx_tx, network))]
#[instrument(skip(block, block_results, sqlx_tx, network))]
pub async fn save_block_tx<'a>(
block: &Block,
block_results: &block_results::Response,
checksums_map: &HashMap<String, String>,
sqlx_tx: &mut Transaction<'a, sqlx::Postgres>,
network: &str,
) -> Result<(), Error> {
Self::save_block_impl(block, block_results, checksums_map, sqlx_tx, network).await
Self::save_block_impl(block, block_results, sqlx_tx, network).await
}

/// Save all the evidences in the list, it is up to the caller to
Expand Down Expand Up @@ -621,13 +609,12 @@ impl Database {
/// Save all the transactions in txs, it is up to the caller to
/// call sqlx_tx.commit().await?; for the changes to take place in
/// database.
#[instrument(skip(txs, block_id, sqlx_tx, checksums_map, block_results, network))]
#[instrument(skip(txs, block_id, sqlx_tx, block_results, network))]
async fn save_transactions<'a>(
txs: &[Vec<u8>],
block_id: &[u8],
block_height: u64,
block_results: &block_results::Response,
checksums_map: &HashMap<String, String>,
sqlx_tx: &mut Transaction<'a, sqlx::Postgres>,
network: &str,
) -> Result<(), Error> {
Expand Down Expand Up @@ -687,25 +674,27 @@ impl Database {
if let TxType::Decrypted(..) = tx.header().tx_type {
// For unknown reason the header has to be updated before hashing it for its id (https://github.com/Zondax/namadexer/issues/23)
hash_id = tx.clone().update_header(TxType::Raw).header_hash().to_vec();

// Look for the return code in the block results
let end_events = block_results.end_block_events.clone().unwrap(); // Safe to use unwrap because if it is not present then something is broken.

// Look for the reurn code associated to the tx
for event in end_events {
for attr in event.attributes.iter() {
// We look to confirm hash of transaction
if attr.key == "hash"
&& attr.value.to_ascii_lowercase() == hex::encode(&hash_id)
{
// Now we look for the return code
for attr in event.attributes.iter() {
if attr.key == "code" {
// using unwrap here is ok because we assume it is always going to be a number unless there is a bug in the node
return_code = Some(attr.value.parse().unwrap());
}
}
}
let hash_id_str = hex::encode(&hash_id);

// Safe to use unwrap because if it is not present then something is broken.
let end_events = block_results.end_block_events.clone().unwrap();

// filter to get the matching event for hash_id
let matching_event = end_events.iter().find(|event| {
event.attributes.iter().any(|attr| {
attr.key == "hash" && attr.value.to_ascii_lowercase() == hash_id_str
})
});

// now for the event get its attribute and parse the return code
if let Some(event) = matching_event {
// Now, look for the "code" attribute in the found event
if let Some(code_attr) = event.attributes.iter().find(|attr| attr.key == "code")
{
// Parse the code value.
// It could be possible to ignore the error by converting the result
// to an Option<i32> but it is better to fail if the value is not a number.
return_code = Some(code_attr.value.parse()?);
}
}

Expand All @@ -724,7 +713,7 @@ impl Database {

let code_hex = hex::encode(code.as_slice());
let unknown_type = "unknown".to_string();
let type_tx = checksums_map.get(&code_hex).unwrap_or(&unknown_type);
let type_tx = CHECKSUMS.get(&code_hex).unwrap_or(&unknown_type);

// decode tx_transfer, tx_bond and tx_unbound to store the decoded data in their tables
// if the transaction has failed don't try to decode because the changes are not included and the data might not be correct
Expand Down
11 changes: 1 addition & 10 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::config::IndexerConfig;
use crate::utils::load_checksums;
use futures::stream::StreamExt;
use futures_util::pin_mut;
use futures_util::Stream;
Expand Down Expand Up @@ -194,14 +193,6 @@ pub async fn start_indexing(
// check if indexes has been created in the database
let has_indexes = utils::has_indexes(&db).await?;

/********************
*
* Load checksums
*
********************/

let checksums_map = load_checksums()?;

/********************
*
* Init RPC
Expand Down Expand Up @@ -232,7 +223,7 @@ pub async fn start_indexing(
// Block consumer that stores block into the database
while let Some(block) = rx.recv().await {
// block is now the block info and the block results
if let Err(e) = db.save_block(&block.0, &block.1, &checksums_map).await {
if let Err(e) = db.save_block(&block.0, &block.1).await {
// shutdown producer task
shutdown.store(true, Ordering::Relaxed);
tracing::error!("Closing block producer task due to an error saving last block: {e}");
Expand Down
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub use indexer::start_indexing;
pub use server::{create_server, start_server, BlockInfo};
pub use telemetry::{get_subscriber, init_subscriber, setup_logging};

use std::collections::HashMap;

use once_cell::sync::Lazy;

pub const INDEXER_GET_BLOCK_DURATION: &str = "indexer_get_block_duration";
const DB_SAVE_BLOCK_COUNTER: &str = "db_save_block_count";
const DB_SAVE_BLOCK_DURATION: &str = "db_save_block_duration";
Expand All @@ -28,3 +32,11 @@ const INDEXER_LAST_SAVE_BLOCK_HEIGHT: &str = "indexer_last_save_block_height";
const INDEXER_LAST_GET_BLOCK_HEIGHT: &str = "indexer_last_get_block_height";

pub const MASP_ADDR: &str = "tnam1pcqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqzmefah";

// Checksums for the different transaction types,
// stored as a global for easy access from anywhere.
pub(crate) static CHECKSUMS: Lazy<HashMap<String, String>> =
// Lazylly load the checksums from the env/file
// this helps reducing the overhead of passing checksums to database
// functions for data that is initialized once and never changes.
Lazy::new(|| utils::load_checksums().expect("Failed to load checksums"));
7 changes: 1 addition & 6 deletions tests/save_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ mod utils;

#[cfg(test)]
mod save_block {
use namadexer::utils::load_checksums;
use std::fs;
use tendermint::block::Block;
use tendermint_rpc::endpoint::block_results;
Expand All @@ -18,8 +17,6 @@ mod save_block {
// now create a fresh database for tests
let db = create_test_db(helper_db.pool(), TESTING_DB_NAME).await;

let checksums_map = load_checksums().unwrap();

let data = fs::read_to_string("./tests/blocks_vector.json").unwrap();
let blocks: Vec<Block> = serde_json::from_str(&data).unwrap();
let data = fs::read_to_string("./tests/block_results_vector.json").unwrap();
Expand All @@ -28,9 +25,7 @@ mod save_block {
db.create_tables().await.unwrap();

for i in 0..blocks.len() {
db.save_block(&blocks[i], &block_results[i], &checksums_map)
.await
.unwrap();
db.save_block(&blocks[i], &block_results[i]).await.unwrap();
}

db.create_indexes()
Expand Down
Loading