diff --git a/Cargo.toml b/Cargo.toml index fddb0afc..d7355aa4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,7 +119,13 @@ panic = "unwind" [profile.release-lto] inherits = "release" -codegen-units = 1 -lto = "fat" -opt-level = 3 -panic = "abort" +incremental = false +opt-level = 1 +panic = "unwind" + +#[profile.release-lto] +#inherits = "release" +#codegen-units = 1 +#lto = "fat" +#opt-level = 3 +#panic = "abort" diff --git a/crates/erc20_payment_lib/migrations/20231017000000_chain_tx.sql b/crates/erc20_payment_lib/migrations/20231017000000_chain_tx.sql new file mode 100644 index 00000000..aeb02392 --- /dev/null +++ b/crates/erc20_payment_lib/migrations/20231017000000_chain_tx.sql @@ -0,0 +1,5 @@ +CREATE UNIQUE INDEX "chain_tx_tx_hash" ON "chain_tx" (tx_hash); + +ALTER TABLE "chain_transfer" ADD COLUMN "fee_paid" TEXT NULL; +ALTER TABLE "chain_transfer" ADD COLUMN "blockchain_date" DATETIME NULL; + diff --git a/crates/erc20_payment_lib/migrations/20231019000000_scan.sql b/crates/erc20_payment_lib/migrations/20231019000000_scan.sql new file mode 100644 index 00000000..24d978b1 --- /dev/null +++ b/crates/erc20_payment_lib/migrations/20231019000000_scan.sql @@ -0,0 +1,10 @@ +CREATE TABLE "scan_info" +( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + chain_id INTEGER NOT NULL, + filter TEXT NOT NULL, + start_block INTEGER NOT NULL, + last_block INTEGER NOT NULL +); + +CREATE UNIQUE INDEX "idx_scan_info_chain_id" ON "scan_info" ("chain_id", "filter"); \ No newline at end of file diff --git a/crates/erc20_payment_lib/src/db/model.rs b/crates/erc20_payment_lib/src/db/model.rs index 30813b90..9aac5397 100644 --- a/crates/erc20_payment_lib/src/db/model.rs +++ b/crates/erc20_payment_lib/src/db/model.rs @@ -1,13 +1,15 @@ mod allowance_dao; mod chain_transfer_dao; mod chain_tx_dao; +mod scan_dao; mod token_transfer_dao; mod transfer_in_dao; mod tx_dao; pub use allowance_dao::AllowanceDao; -pub use chain_transfer_dao::{ChainTransferDao, ChainTransferDaoExt}; +pub use chain_transfer_dao::ChainTransferDao; pub use chain_tx_dao::ChainTxDao; +pub use scan_dao::ScanDao; pub use token_transfer_dao::TokenTransferDao; pub use transfer_in_dao::TransferInDao; pub use tx_dao::TxDao; diff --git a/crates/erc20_payment_lib/src/db/model/chain_transfer_dao.rs b/crates/erc20_payment_lib/src/db/model/chain_transfer_dao.rs index cca90f94..5cb720bd 100644 --- a/crates/erc20_payment_lib/src/db/model/chain_transfer_dao.rs +++ b/crates/erc20_payment_lib/src/db/model/chain_transfer_dao.rs @@ -11,8 +11,10 @@ pub struct ChainTransferDao { pub token_addr: Option, pub token_amount: String, pub chain_tx_id: i64, + pub fee_paid: Option, + pub blockchain_date: Option>, } - +/* #[derive(Serialize, sqlx::FromRow, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct ChainTransferDaoExt { @@ -23,5 +25,5 @@ pub struct ChainTransferDaoExt { pub token_addr: Option, pub token_amount: String, pub chain_tx_id: i64, - pub blockchain_date: Option>, -} + +}*/ diff --git a/crates/erc20_payment_lib/src/db/model/scan_dao.rs b/crates/erc20_payment_lib/src/db/model/scan_dao.rs new file mode 100644 index 00000000..897c488a --- /dev/null +++ b/crates/erc20_payment_lib/src/db/model/scan_dao.rs @@ -0,0 +1,11 @@ +use serde::Serialize; + +#[derive(Serialize, sqlx::FromRow, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ScanDao { + pub id: i64, + pub chain_id: i64, + pub filter: String, + pub start_block: i64, + pub last_block: i64, +} diff --git a/crates/erc20_payment_lib/src/db/ops.rs b/crates/erc20_payment_lib/src/db/ops.rs index 11cc791b..c091d343 100644 --- a/crates/erc20_payment_lib/src/db/ops.rs +++ b/crates/erc20_payment_lib/src/db/ops.rs @@ -1,6 +1,7 @@ mod allowance_ops; mod chain_transfer_ops; mod chain_tx_ops; +mod scan_ops; mod token_transfer_ops; mod transfer_in_ops; mod tx_ops; @@ -8,6 +9,7 @@ mod tx_ops; pub use allowance_ops::*; pub use chain_transfer_ops::*; pub use chain_tx_ops::*; +pub use scan_ops::*; use std::future::Future; use std::time::Duration; pub use token_transfer_ops::*; diff --git a/crates/erc20_payment_lib/src/db/ops/chain_transfer_ops.rs b/crates/erc20_payment_lib/src/db/ops/chain_transfer_ops.rs index 27f5f963..62afa411 100644 --- a/crates/erc20_payment_lib/src/db/ops/chain_transfer_ops.rs +++ b/crates/erc20_payment_lib/src/db/ops/chain_transfer_ops.rs @@ -12,8 +12,8 @@ where { let res = sqlx::query_as::<_, ChainTransferDao>( r"INSERT INTO chain_transfer -(from_addr, receiver_addr, chain_id, token_addr, token_amount, chain_tx_id) -VALUES ($1, $2, $3, $4, $5, $6) RETURNING *; +(from_addr, receiver_addr, chain_id, token_addr, token_amount, chain_tx_id, fee_paid, blockchain_date) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING *; ", ) .bind(&chain_transfer.from_addr) @@ -22,21 +22,39 @@ VALUES ($1, $2, $3, $4, $5, $6) RETURNING *; .bind(&chain_transfer.token_addr) .bind(&chain_transfer.token_amount) .bind(chain_transfer.chain_tx_id) + .bind(&chain_transfer.fee_paid) + .bind(chain_transfer.blockchain_date) .fetch_one(executor) .await?; Ok(res) } -pub async fn get_account_chain_transfers( +pub async fn get_all_chain_transfers( conn: &SqlitePool, - account: &str, -) -> Result, sqlx::Error> { - let rows = sqlx::query_as::<_, ChainTransferDaoExt>(r" -SELECT ct.id, ct.chain_id, ct.from_addr, ct.receiver_addr, ct.token_addr, ct.chain_tx_id, ct.token_amount, cx.blockchain_date -FROM chain_transfer as ct -JOIN chain_tx as cx on ct.chain_tx_id = cx.id -WHERE ct.receiver_addr = $1 -").bind(account).fetch_all(conn).await?; + limit: Option, +) -> Result, sqlx::Error> { + let limit = limit.unwrap_or(i64::MAX); + let rows = sqlx::query_as::<_, ChainTransferDao>( + r"SELECT * FROM chain_transfer ORDER by id DESC LIMIT $1", + ) + .bind(limit) + .fetch_all(conn) + .await?; + Ok(rows) +} +pub async fn get_chain_transfers_by_chain_id( + conn: &SqlitePool, + chain_id: i64, + limit: Option, +) -> Result, sqlx::Error> { + let limit = limit.unwrap_or(i64::MAX); + let rows = sqlx::query_as::<_, ChainTransferDao>( + r"SELECT * FROM chain_transfer WHERE chain_id = $1 ORDER by id DESC LIMIT $2", + ) + .bind(chain_id) + .bind(limit) + .fetch_all(conn) + .await?; Ok(rows) } diff --git a/crates/erc20_payment_lib/src/db/ops/chain_tx_ops.rs b/crates/erc20_payment_lib/src/db/ops/chain_tx_ops.rs index 29aff965..57c36a7b 100644 --- a/crates/erc20_payment_lib/src/db/ops/chain_tx_ops.rs +++ b/crates/erc20_payment_lib/src/db/ops/chain_tx_ops.rs @@ -41,6 +41,33 @@ pub async fn get_chain_tx(conn: &SqlitePool, id: i64) -> Result( + executor: E, + tx_hash: String, +) -> Result, sqlx::Error> +where + E: Executor<'c, Database = Sqlite>, +{ + let row = sqlx::query_as::<_, ChainTxDao>(r"SELECT * FROM chain_tx WHERE tx_hash = $1") + .bind(tx_hash) + .fetch_optional(executor) + .await?; + Ok(row) +} + +pub async fn get_last_scanned_block<'c, E>( + executor: E, + chain_id: i64, +) -> Result, sqlx::Error> +where + E: Executor<'c, Database = Sqlite>, +{ + sqlx::query_scalar::<_, i64>(r"SELECT MAX(block_number) FROM chain_tx WHERE chain_id = $1") + .bind(chain_id) + .fetch_optional(executor) + .await +} + #[tokio::test] async fn tx_chain_test() -> sqlx::Result<()> { println!("Start tx_chain_test..."); diff --git a/crates/erc20_payment_lib/src/db/ops/scan_ops.rs b/crates/erc20_payment_lib/src/db/ops/scan_ops.rs new file mode 100644 index 00000000..e2379994 --- /dev/null +++ b/crates/erc20_payment_lib/src/db/ops/scan_ops.rs @@ -0,0 +1,99 @@ +use crate::db::model::ScanDao; +use sqlx::{Executor, Sqlite}; + +pub async fn get_scan_info<'c, E>( + executor: E, + chain_id: i64, + filter: &str, +) -> Result, sqlx::Error> +where + E: Executor<'c, Database = Sqlite>, +{ + let row = sqlx::query_as::<_, ScanDao>( + r"SELECT * FROM scan_info WHERE chain_id = $1 AND filter = $2", + ) + .bind(chain_id) + .bind(filter) + .fetch_optional(executor) + .await?; + Ok(row) +} + +pub async fn delete_scan_info<'c, E>( + executor: E, + chain_id: i64, + filter: &str, +) -> Result<(), sqlx::Error> +where + E: Executor<'c, Database = Sqlite>, +{ + sqlx::query(r"DELETE FROM scan_info WHERE chain_id = $1 AND filter = $2") + .bind(chain_id) + .bind(filter) + .execute(executor) + .await?; + Ok(()) +} + +pub async fn upsert_scan_info<'c, E>( + executor: E, + scan_dao: &ScanDao, +) -> Result +where + E: Executor<'c, Database = Sqlite>, +{ + let res = sqlx::query_as::<_, ScanDao>( + r"INSERT OR REPLACE INTO scan_info +(chain_id, filter, start_block, last_block) +VALUES ($1, $2, $3, $4) RETURNING *; +", + ) + .bind(scan_dao.chain_id) + .bind(&scan_dao.filter) + .bind(scan_dao.start_block) + .bind(scan_dao.last_block) + .fetch_one(executor) + .await?; + Ok(res) +} +#[tokio::test] +async fn tx_chain_test() -> sqlx::Result<()> { + println!("Start tx_chain_test..."); + + use crate::db::create_sqlite_connection; + let conn = create_sqlite_connection(None, None, false, true) + .await + .unwrap(); + + let mut scan_info_to_insert = ScanDao { + id: -1, + chain_id: 25, + filter: "filter".to_string(), + start_block: 77, + last_block: 6666, + }; + + let scan_info_from_insert = upsert_scan_info(&conn, &scan_info_to_insert).await?; + scan_info_to_insert.id = scan_info_from_insert.id; + assert_eq!(scan_info_to_insert.id, 1); + let scan_info_from_dao = get_scan_info(&conn, 25, "filter").await?.unwrap(); + + //all three should be equal + assert_eq!(scan_info_to_insert, scan_info_from_dao); + assert_eq!(scan_info_from_insert, scan_info_from_dao); + + assert_eq!(None, get_scan_info(&conn, 25, "filter2").await?); + assert_eq!(None, get_scan_info(&conn, 26, "filter").await?); + + //this transaction will overwrite id due to conflict in unique index + scan_info_to_insert.id = 2; + let result = upsert_scan_info(&conn, &scan_info_to_insert).await.unwrap(); + + assert_eq!(result.id, 2); + + delete_scan_info(&conn, 25, "filter").await.unwrap(); + + assert_eq!(None, get_scan_info(&conn, 25, "filter").await?); + + Ok(()) +} diff --git a/crates/erc20_payment_lib/src/db/ops/token_transfer_ops.rs b/crates/erc20_payment_lib/src/db/ops/token_transfer_ops.rs index cf562732..24d4800d 100644 --- a/crates/erc20_payment_lib/src/db/ops/token_transfer_ops.rs +++ b/crates/erc20_payment_lib/src/db/ops/token_transfer_ops.rs @@ -1,4 +1,5 @@ use crate::db::model::*; +use crate::db::ops::get_chain_transfers_by_chain_id; use crate::err_from; use crate::error::PaymentError; use crate::error::*; @@ -131,6 +132,22 @@ pub async fn get_all_token_transfers( Ok(rows) } +pub async fn get_token_transfers_by_chain_id( + conn: &SqlitePool, + chain_id: i64, + limit: Option, +) -> Result, sqlx::Error> { + let limit = limit.unwrap_or(i64::MAX); + let rows = sqlx::query_as::<_, TokenTransferDao>( + r"SELECT * FROM token_transfer WHERE chain_id = $1 ORDER by id DESC LIMIT $2", + ) + .bind(chain_id) + .bind(limit) + .fetch_all(conn) + .await?; + Ok(rows) +} + pub async fn get_pending_token_transfers( conn: &SqlitePool, ) -> Result, sqlx::Error> { @@ -194,11 +211,76 @@ pub struct TransferStats { pub per_sender: BTreeMap, } +pub async fn get_transfer_stats_from_blockchain( + conn: &SqlitePool, + chain_id: i64, + limit: Option, +) -> Result { + let tt = get_chain_transfers_by_chain_id(conn, chain_id, limit) + .await + .map_err(err_from!())?; + //let txs = get_transactions(conn, None, None, None) + // .await + // .map_err(err_from!())?; + //let mut txs_map = HashMap::new(); + //for tx in txs { + // txs_map.insert(tx.id, tx); + //} + + let mut ts = TransferStats::default(); + for t in tt { + let from_addr = Address::from_str(&t.from_addr).map_err(err_from!())?; + let to_addr = Address::from_str(&t.receiver_addr).map_err(err_from!())?; + let ts = ts + .per_sender + .entry(from_addr) + .or_insert_with(TransferStatsBase::default); + let (t1, t2) = ( + &mut ts.all, + ts.per_receiver + .entry(to_addr) + .or_insert_with(TransferStatsPart::default), + ); + + for ts in [t1, t2] { + ts.total_count += 1; + ts.done_count += 1; + if let Some(fee_paid) = &t.fee_paid { + ts.fee_paid += U256::from_dec_str(fee_paid).map_err(err_from!())?; + } + + if let Some(paid_date) = t.blockchain_date { + if ts.first_paid_date.is_none() || ts.first_paid_date.unwrap() > paid_date { + ts.first_paid_date = Some(paid_date); + } + if ts.last_paid_date.is_none() || ts.last_paid_date.unwrap() < paid_date { + ts.last_paid_date = Some(paid_date); + } + } + ts.transaction_ids.insert(t.chain_tx_id); + //ts.fee_paid += U256::from_dec_str(&t.fee_paid.clone().unwrap()).map_err(err_from!())?; + if let Some(token_addr) = &t.token_addr { + let token_addr = Address::from_str(token_addr).map_err(err_from!())?; + let token_amount = U256::from_dec_str(&t.token_amount).map_err(err_from!())?; + ts.erc20_token_transferred + .entry(token_addr) + .or_insert_with(U256::zero) + .add_assign(token_amount); + } else { + ts.native_token_transferred + .add_assign(U256::from_dec_str(&t.token_amount).map_err(err_from!())?); + } + } + } + Ok(ts) +} + pub async fn get_transfer_stats( conn: &SqlitePool, + chain_id: i64, limit: Option, ) -> Result { - let tt = get_all_token_transfers(conn, limit) + let tt = get_token_transfers_by_chain_id(conn, chain_id, limit) .await .map_err(err_from!())?; //let txs = get_transactions(conn, None, None, None) diff --git a/crates/erc20_payment_lib/src/server.rs b/crates/erc20_payment_lib/src/server.rs index 45cb11a3..d232225f 100644 --- a/crates/erc20_payment_lib/src/server.rs +++ b/crates/erc20_payment_lib/src/server.rs @@ -418,14 +418,14 @@ pub async fn account_payments_in(data: Data>, req: HttpRequest) let db_conn = data.db_connection.lock().await; return_on_error!(get_account_transfers_in(&db_conn, &account, None).await) }; - let chain_transfers = { + /*let chain_transfers = { let db_conn = data.db_connection.lock().await; return_on_error!(get_account_chain_transfers(&db_conn, &account).await) - }; + };*/ web::Json(json!({ "transfersIn": transfers_in, - "chainTransfers": chain_transfers, + // "chainTransfers": chain_transfers, })) } diff --git a/crates/erc20_payment_lib/src/service.rs b/crates/erc20_payment_lib/src/service.rs index 2a0802a0..0fbf762f 100644 --- a/crates/erc20_payment_lib/src/service.rs +++ b/crates/erc20_payment_lib/src/service.rs @@ -75,36 +75,77 @@ pub async fn add_glm_request( .map_err(err_from!()) } -pub async fn transaction_from_chain( +pub async fn transaction_from_chain_and_into_db( web3: &Web3, conn: &SqlitePool, chain_id: i64, tx_hash: &str, -) -> Result { +) -> Result, PaymentError> { println!("tx_hash: {tx_hash}"); let tx_hash = web3::types::H256::from_str(tx_hash) .map_err(|_err| ConversionError::from("Cannot parse tx_hash".to_string())) .map_err(err_from!())?; + if let Some(chain_tx) = get_chain_tx_hash(conn, tx_hash.to_string()) + .await + .map_err(err_from!())? + { + log::info!("Transaction already in DB: {}, skipping...", chain_tx.id); + return Ok(Some(chain_tx)); + } + let (chain_tx_dao, transfers) = find_receipt_extended(web3, tx_hash, chain_id).await?; - if chain_tx_dao.chain_status == 1 { - let mut db_transaction = conn.begin().await.map_err(err_from!())?; + if chain_tx_dao.chain_status != 1 { + return Ok(None); + } + + let mut db_transaction = conn.begin().await.map_err(err_from!())?; - let tx = insert_chain_tx(&mut *db_transaction, &chain_tx_dao) - .await + let tx = insert_chain_tx(&mut *db_transaction, &chain_tx_dao) + .await + .map_err(err_from!())?; + + if !transfers.is_empty() { + //This is a bit complicated, but we need to distribute the fee paid by the user in transaction + //to all token transfers in the transaction in the way that sum of fees is correct + //Implementation is a bit rough, but it works + let mut distribute_fee: Vec> = Vec::with_capacity(transfers.len()); + let val = U256::from_dec_str(&tx.fee_paid) + .map_err(|_err| ConversionError::from("failed to parse fee paid".into())) .map_err(err_from!())?; - for mut transfer in transfers { + let mut fee_left = val; + let val_share = val / U256::from(transfers.len() as u64); + for _tt in &transfers { + fee_left -= val_share; + distribute_fee.push(Some(val_share)); + } + let fee_left = fee_left.as_u64() as usize; + if fee_left >= transfers.len() { + panic!( + "fee left is too big, critical error when distributing fee {}/{}", + fee_left, + transfers.len() + ); + } + //distribute the rest of the fee by adding one am much time as needed + distribute_fee.iter_mut().take(fee_left).for_each(|item| { + let val = item.unwrap(); + *item = Some(val + U256::from(1)); + }); + + for (mut transfer, fee_paid) in transfers.into_iter().zip(distribute_fee) { transfer.chain_tx_id = tx.id; + transfer.fee_paid = fee_paid.map(|v| v.to_string()); insert_chain_transfer(&mut *db_transaction, &transfer) .await .map_err(err_from!())?; } - db_transaction.commit().await.map_err(err_from!())?; - log::info!("Transaction found and parsed successfully: {}", tx.id); } - Ok(true) + db_transaction.commit().await.map_err(err_from!())?; + log::info!("Transaction found and parsed successfully: {}", tx.id); + Ok(Some(tx)) } pub async fn confirm_loop( diff --git a/crates/erc20_payment_lib/src/transaction.rs b/crates/erc20_payment_lib/src/transaction.rs index 25a1f859..485e26d3 100644 --- a/crates/erc20_payment_lib/src/transaction.rs +++ b/crates/erc20_payment_lib/src/transaction.rs @@ -684,6 +684,8 @@ pub async fn find_receipt_extended( token_addr: None, token_amount: tx.value.to_string(), chain_tx_id: 0, + fee_paid: None, + blockchain_date: Some(chain_tx_dao.blockchain_date), }); } @@ -751,6 +753,8 @@ pub async fn find_receipt_extended( token_addr: Some(format!("{:#x}", log.address)), token_amount: amount.to_string(), chain_tx_id: 0, + fee_paid: None, + blockchain_date: Some(chain_tx_dao.blockchain_date), }); } else if to == tx_to { //ignore payment to contract - handled in loop before @@ -764,6 +768,8 @@ pub async fn find_receipt_extended( token_addr: Some(format!("{:#x}", log.address)), token_amount: amount.to_string(), chain_tx_id: 0, + fee_paid: None, + blockchain_date: Some(chain_tx_dao.blockchain_date), }); } } @@ -775,7 +781,8 @@ pub async fn find_receipt_extended( pub async fn get_erc20_logs( web3: &Web3, erc20_address: Address, - topic_receivers: Vec, + topic_senders: Option>, + topic_receivers: Option>, from_block: i64, to_block: i64, ) -> Result, PaymentError> { @@ -789,8 +796,8 @@ pub async fn get_erc20_logs( "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", ) .unwrap()]), - None, - Some(topic_receivers), + topic_senders, + topic_receivers, None, ) .from_block(BlockNumber::Number(U64::from(from_block as u64))) @@ -801,22 +808,32 @@ pub async fn get_erc20_logs( .map_err(|e| err_custom_create!("Error while getting logs: {}", e)) } +#[allow(clippy::too_many_arguments)] pub async fn import_erc20_txs( web3: &Web3, erc20_address: Address, _chain_id: i64, - accounts: &[Address], + filter_by_senders: Option<&[Address]>, + filter_by_receivers: Option<&[Address]>, + mut start_block: i64, + scan_end_block: i64, + blocks_at_once: u64, ) -> Result, PaymentError> { - let topic_receivers: Vec = accounts - .iter() - .map(|f| { - let mut topic = [0u8; 32]; - topic[12..32].copy_from_slice(&f.to_fixed_bytes()); - H256::from(topic) + let option_address_to_option_h256 = |val: Option<&[Address]>| -> Option> { + val.map(|accounts| { + accounts + .iter() + .map(|f| { + let mut topic = [0u8; 32]; + topic[12..32].copy_from_slice(&f.to_fixed_bytes()); + H256::from(topic) + }) + .collect() }) - .collect(); + }; - println!("{:#x}", topic_receivers[0]); + let topic_receivers = option_address_to_option_h256(filter_by_receivers); + let topic_senders = option_address_to_option_h256(filter_by_senders); let current_block = web3 .eth() @@ -825,26 +842,26 @@ pub async fn import_erc20_txs( .map_err(err_from!())? .as_u64() as i64; - //start around 30 days ago - let mut start_block = std::cmp::max(1, current_block - (3600 * 24 * 30) / 2); - let mut txs = HashMap::::new(); loop { - println!("start block: {start_block}"); - let end_block = std::cmp::min(start_block + 1000, current_block); + let end_block = std::cmp::min( + std::cmp::min(start_block + 1000, current_block), + scan_end_block, + ); if start_block > end_block { break; } + log::info!("Scanning chain, blocks: {start_block} - {end_block}"); let logs = get_erc20_logs( web3, erc20_address, + topic_senders.clone(), topic_receivers.clone(), start_block, end_block, ) .await?; for log in logs.into_iter() { - println!("Block number: {}", log.block_number.unwrap()); txs.insert( log.transaction_hash .ok_or(err_custom_create!("Log without transaction hash"))?, @@ -852,16 +869,21 @@ pub async fn import_erc20_txs( .ok_or(err_custom_create!("Log without block number"))? .as_u64(), ); + log::info!( + "Found matching log entry in block: {}, tx: {}", + log.block_number.unwrap(), + log.block_number.unwrap() + ); } - start_block += 1000; + start_block += blocks_at_once as i64; } if txs.is_empty() { - println!("No logs found"); - } - for tx in &txs { - println!("Transaction: {:#x}", tx.0); + log::info!("No logs found"); + } else { + log::info!("Found {} transactions", txs.len()); } + //return transactions sorted by block number let mut vec = txs.into_iter().collect::>(); vec.sort_by(|a, b| a.1.cmp(&b.1)); diff --git a/crates/erc20_payment_lib_test/src/durabily2.rs b/crates/erc20_payment_lib_test/src/durabily2.rs index dc19bbcd..856812c3 100644 --- a/crates/erc20_payment_lib_test/src/durabily2.rs +++ b/crates/erc20_payment_lib_test/src/durabily2.rs @@ -63,9 +63,10 @@ pub async fn test_durability2(generate_count: u64, gen_interval_secs: f64, trans (fee_paid, fee_paid_approve) }); + let mut config = create_default_config_setup(&proxy_url_base, proxy_key).await; + let chain_id = config.chain.get_mut("dev").unwrap().chain_id; { - let mut config = create_default_config_setup(&proxy_url_base, proxy_key).await; config.chain.get_mut("dev").unwrap().multi_contract.as_mut().unwrap().max_at_once = transfers_at_once; //load private key for account 0xbfb29b133aa51c4b45b49468f9a22958eafea6fa @@ -129,7 +130,7 @@ pub async fn test_durability2(generate_count: u64, gen_interval_secs: f64, trans let conn_ = conn.clone(); let _stats = tokio::spawn(async move { loop { - let stats = match get_transfer_stats(&conn_, Some(10000)).await { + let stats = match get_transfer_stats(&conn_, chain_id, Some(10000)).await { Ok(stats) => stats, Err(err) => { log::error!("Error from get_transfer_stats {err}"); @@ -152,7 +153,7 @@ pub async fn test_durability2(generate_count: u64, gen_interval_secs: f64, trans let (fee_paid_events, fee_paid_events_approve) = receiver_loop.await.unwrap(); log::info!("fee paid from events: {}", u256_to_rust_dec(fee_paid_events, None).unwrap()); - let transfer_stats = get_transfer_stats(&conn, None).await.unwrap(); + let transfer_stats = get_transfer_stats(&conn, chain_id, None).await.unwrap(); let stats_all = transfer_stats.per_sender.iter().next().unwrap().1.all.clone(); let fee_paid_stats = stats_all.fee_paid; log::info!("fee paid from stats: {}", u256_to_rust_dec(fee_paid_stats, None).unwrap()); diff --git a/crates/erc20_payment_lib_test/src/multi_erc20_transfer.rs b/crates/erc20_payment_lib_test/src/multi_erc20_transfer.rs index 812a1656..1cdf14f8 100644 --- a/crates/erc20_payment_lib_test/src/multi_erc20_transfer.rs +++ b/crates/erc20_payment_lib_test/src/multi_erc20_transfer.rs @@ -63,9 +63,10 @@ pub async fn test_durability(generate_count: u64, gen_interval_secs: f64, transf (fee_paid, fee_paid_approve) }); + let mut config = create_default_config_setup(&proxy_url_base, proxy_key).await; + let chain_id = config.chain.get_mut("dev").unwrap().chain_id; { - let mut config = create_default_config_setup(&proxy_url_base, proxy_key).await; config.chain.get_mut("dev").unwrap().multi_contract.as_mut().unwrap().max_at_once = transfers_at_once; //load private key for account 0xbfb29b133aa51c4b45b49468f9a22958eafea6fa @@ -130,7 +131,7 @@ pub async fn test_durability(generate_count: u64, gen_interval_secs: f64, transf let conn_ = conn.clone(); let _stats = tokio::spawn(async move { loop { - let stats = match get_transfer_stats(&conn_, Some(10000)).await { + let stats = match get_transfer_stats(&conn_, chain_id, Some(10000)).await { Ok(stats) => stats, Err(err) => { log::error!("Error from get_transfer_stats {err}"); @@ -153,7 +154,7 @@ pub async fn test_durability(generate_count: u64, gen_interval_secs: f64, transf let (fee_paid_events, fee_paid_events_approve) = receiver_loop.await.unwrap(); log::info!("fee paid from events: {}", u256_to_rust_dec(fee_paid_events, None).unwrap()); - let transfer_stats = get_transfer_stats(&conn, None).await.unwrap(); + let transfer_stats = get_transfer_stats(&conn, chain_id, None).await.unwrap(); let stats_all = transfer_stats.per_sender.iter().next().unwrap().1.all.clone(); let fee_paid_stats = stats_all.fee_paid; log::info!("fee paid from stats: {}", u256_to_rust_dec(fee_paid_stats, None).unwrap()); diff --git a/examples/import_tx.rs b/examples/import_tx.rs deleted file mode 100644 index 37fc6906..00000000 --- a/examples/import_tx.rs +++ /dev/null @@ -1,82 +0,0 @@ -use erc20_payment_lib::config; -use erc20_payment_lib::db::create_sqlite_connection; - -use erc20_payment_lib::error::PaymentError; - -use erc20_payment_lib::misc::{display_private_keys, load_private_keys}; -use std::env; -use std::path::Path; -use std::str::FromStr; - -use erc20_payment_lib::service::transaction_from_chain; -use erc20_payment_lib::setup::PaymentSetup; -use erc20_payment_lib::transaction::import_erc20_txs; -use structopt::StructOpt; -use web3::ethabi::ethereum_types::Address; - -#[derive(Debug, StructOpt)] -struct ImportTxOptions { - #[structopt(long = "chain-id", default_value = "987789")] - chain_id: i64, - - #[structopt( - long = "tx-hash", - default_value = "0x13d8a54dec1c0a30f1cd5129f690c3e27b9aadd59504957bad4d247966dadae7" - )] - _tx_hash: String, -} - -async fn main_internal() -> Result<(), PaymentError> { - dotenv::dotenv().ok(); - env::set_var( - "RUST_LOG", - env::var("RUST_LOG").unwrap_or("info,sqlx::query=warn,web3=warn".to_string()), - ); - env_logger::init(); - - let cli: ImportTxOptions = ImportTxOptions::from_args(); - - let config = config::Config::load("config-payments.toml").await?; - - let (private_keys, _public_addrs) = load_private_keys(&env::var("ETH_PRIVATE_KEYS").unwrap())?; - display_private_keys(&private_keys); - - let db_conn = env::var("DB_SQLITE_FILENAME").unwrap(); - let conn = create_sqlite_connection(Some(Path::new(&db_conn)), None, false, true).await?; - - let payment_setup = PaymentSetup::new(&config, vec![], true, false, false, 1, 1, false)?; - let ps = payment_setup.chain_setup.get(&cli.chain_id).unwrap(); - let txs = import_erc20_txs( - &ps.providers[0].provider, - ps.glm_address.unwrap(), - cli.chain_id, - &[Address::from_str("0x0000000600000006000000060000000600000006").unwrap()], - ) - .await - .unwrap(); - - for tx in &txs { - transaction_from_chain( - &ps.providers[0].provider, - &conn, - cli.chain_id, - &format!("{tx:#x}"), - ) - .await - .unwrap(); - } - - conn.close().await; //it is needed to process all the transactions before closing the connection - Ok(()) -} - -#[tokio::main] -async fn main() -> Result<(), PaymentError> { - match main_internal().await { - Ok(_) => Ok(()), - Err(e) => { - eprintln!("Error: {e}"); - Err(e) - } - } -} diff --git a/prometheus/docker-compose.yml b/prometheus/docker-compose.yml new file mode 100644 index 00000000..1441c174 --- /dev/null +++ b/prometheus/docker-compose.yml @@ -0,0 +1,19 @@ +services: + expose: + image: python:alpine + volumes: + - type: bind + source: ./metrics.txt + target: /expose/metrics.txt + working_dir: /expose + command: python -u -m http.server 5000 + ports: + - 5000:5000 + prometeus: + image: prom/prometheus + volumes: + - type: bind + source: ./prometheus.yml + target: /etc/prometheus/prometheus.yml + ports: + - 9090:9090 \ No newline at end of file diff --git a/prometheus/expose.py b/prometheus/expose.py new file mode 100644 index 00000000..a95c0a85 --- /dev/null +++ b/prometheus/expose.py @@ -0,0 +1,14 @@ +from flask import Flask + +app = Flask(__name__) + + +@app.route("/metrics", methods=['GET']) +def getfile(): + with open("metrics.txt", "r+") as f: + data = f.read() + return data + + +if __name__ == '__main__': + app.run(host='localhost') diff --git a/prometheus/metrics.txt b/prometheus/metrics.txt new file mode 100644 index 00000000..650f4a38 --- /dev/null +++ b/prometheus/metrics.txt @@ -0,0 +1,6 @@ +# HELP senders_count Number of distinct receivers +# TYPE senders_count counter +senders_count{chain_id="137", sender="0x09e4f0ae44d5e60d44a8928af7531e6a862290bc"} 19 +# HELP erc20_transferred Number of distinct receivers +# TYPE erc20_transferred counter +erc20_transferred{chain_id="137", sender="0x09e4f0ae44d5e60d44a8928af7531e6a862290bc"} 0.000000029482970684 diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml new file mode 100644 index 00000000..63f2f80e --- /dev/null +++ b/prometheus/prometheus.yml @@ -0,0 +1,15 @@ +global: + scrape_interval: 15s # By default, scrape targets every 15 seconds. + + # Attach these labels to any time series or alerts when communicating with + # external systems (federation, remote storage, Alertmanager). + external_labels: + monitor: 'codelab-monitor' + +# A scrape configuration containing exactly one endpoint to scrape: +# Here it's Prometheus itself. +scrape_configs: + - job_name: 'metrics' + metrics_path: /metrics.txt + static_configs: + - targets: ['expose:5000'] diff --git a/src/main.rs b/src/main.rs index 7ba333a1..2e27f06d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ mod options; +mod stats; use crate::options::{PaymentCommands, PaymentOptions}; use actix_web::Scope; @@ -6,13 +7,13 @@ use actix_web::{web, App, HttpServer}; use csv::ReaderBuilder; use erc20_payment_lib::config::AdditionalOptions; use erc20_payment_lib::db::create_sqlite_connection; -use erc20_payment_lib::db::model::TokenTransferDao; +use erc20_payment_lib::db::model::{ScanDao, TokenTransferDao}; use erc20_payment_lib::db::ops::{ - get_transfer_stats, insert_token_transfer, update_token_transfer, TransferStatsPart, + delete_scan_info, get_scan_info, insert_token_transfer, update_token_transfer, upsert_scan_info, }; use erc20_payment_lib::server::*; use erc20_payment_lib::signer::PrivateKeySigner; -use erc20_payment_lib::utils::{rust_dec_to_u256, u256_to_rust_dec}; +use erc20_payment_lib::utils::rust_dec_to_u256; use erc20_payment_lib::{ config, err_custom_create, err_from, @@ -21,13 +22,19 @@ use erc20_payment_lib::{ runtime::PaymentRuntime, }; use std::env; +use std::str::FromStr; +use crate::stats::run_stats; use erc20_payment_lib::runtime::remove_last_unsent_transactions; +use erc20_payment_lib::service::transaction_from_chain_and_into_db; +use erc20_payment_lib::setup::PaymentSetup; +use erc20_payment_lib::transaction::import_erc20_txs; use erc20_payment_lib_extra::{account_balance, generate_test_payments}; + use std::sync::Arc; use structopt::StructOpt; use tokio::sync::Mutex; -use web3::types::{H160, U256}; +use web3::ethabi::ethereum_types::Address; async fn main_internal() -> Result<(), PaymentError> { dotenv::dotenv().ok(); @@ -273,182 +280,134 @@ async fn main_internal() -> Result<(), PaymentError> { } PaymentCommands::PaymentStats { payment_stats_options, + } => run_stats(conn.clone(), payment_stats_options, &config).await?, + PaymentCommands::ScanBlockchain { + scan_blockchain_options, } => { - println!("Getting transfers stats..."); - let transfer_stats = get_transfer_stats(&conn, None).await.unwrap(); - let main_sender = transfer_stats.per_sender.iter().next().unwrap(); - let stats_all = main_sender.1.all.clone(); - let fee_paid_stats = stats_all.fee_paid; - println!( - "fee paid from stats: {}", - u256_to_rust_dec(fee_paid_stats, None).unwrap() - ); + log::info!("Scanning blockchain {}", scan_blockchain_options.chain_name); - println!("Number of transfers done: {}", stats_all.done_count); + let payment_setup = + PaymentSetup::new(&config, vec![], true, false, false, 1, 1, false)?; + let chain_cfg = config + .chain + .get(&scan_blockchain_options.chain_name) + .ok_or(err_custom_create!( + "Chain {} not found in config file", + scan_blockchain_options.chain_name + ))?; + let web3 = payment_setup.get_provider(chain_cfg.chain_id)?; - println!( - "Number of distinct receivers: {}", - main_sender.1.per_receiver.len() - ); + let sender = Address::from_str(&scan_blockchain_options.sender).unwrap(); - println!( - "Number of web3 transactions: {}", - main_sender.1.all.transaction_ids.len() - ); + let scan_info = ScanDao { + id: 0, + chain_id: chain_cfg.chain_id, + filter: format!("{sender:#x}"), + start_block: -1, + last_block: -1, + }; + let scan_info_from_db = get_scan_info(&conn, chain_cfg.chain_id, &scan_info.filter) + .await + .map_err(err_from!())?; - println!( - "First transfer requested at {}", - main_sender - .1 - .all - .first_transfer_date - .map(|d| d.to_string()) - .unwrap_or("N/A".to_string()) - ); - println!( - "First payment made {}", - main_sender - .1 - .all - .first_paid_date - .map(|d| d.to_string()) - .unwrap_or("N/A".to_string()) - ); - println!( - "Last transfer requested at {}", - main_sender - .1 - .all - .last_transfer_date - .map(|d| d.to_string()) - .unwrap_or("N/A".to_string()) - ); - println!( - "Last payment made {}", - main_sender - .1 - .all - .last_paid_date - .map(|d| d.to_string()) - .unwrap_or("N/A".to_string()) - ); - println!( - "Max payment delay: {}", - main_sender - .1 - .all - .max_payment_delay - .map(|d| d.num_seconds().to_string() + "s") - .unwrap_or("N/A".to_string()) - ); + let mut scan_info = if scan_blockchain_options.start_new_scan { + log::warn!("Starting new scan - removing old scan info from db"); + delete_scan_info(&conn, scan_info.chain_id, &scan_info.filter) + .await + .map_err(err_from!())?; + scan_info + } else if let Some(scan_info_from_db) = scan_info_from_db { + log::debug!("Found scan info from db: {:?}", scan_info_from_db); + scan_info_from_db + } else { + scan_info + }; - println!( - "Native token sent: {}", - u256_to_rust_dec(main_sender.1.all.native_token_transferred, None).unwrap() + let current_block = web3 + .eth() + .block_number() + .await + .map_err(err_from!())? + .as_u64() as i64; + + //start around 30 days ago + let mut start_block = std::cmp::max( + 1, + current_block - scan_blockchain_options.from_blocks_ago as i64, ); - let per_receiver = main_sender.1.per_receiver.clone(); - let mut per_receiver: Vec<(H160, TransferStatsPart)> = - per_receiver.into_iter().collect(); - if payment_stats_options.order_by == "payment_delay" { - per_receiver.sort_by(|r, b| { - let left = - r.1.max_payment_delay - .unwrap_or(chrono::Duration::max_value()); - let right = - b.1.max_payment_delay - .unwrap_or(chrono::Duration::max_value()); - right.cmp(&left) - }); - } else if payment_stats_options.order_by == "token_sent" { - per_receiver.sort_by(|r, b| { - let left = *r.1.erc20_token_transferred.iter().next().unwrap().1; - let right = *b.1.erc20_token_transferred.iter().next().unwrap().1; - right.cmp(&left) - }); - } else if payment_stats_options.order_by == "gas_paid" - || payment_stats_options.order_by == "fee_paid" - { - per_receiver.sort_by(|r, b| { - let left = r.1.fee_paid; - let right = b.1.fee_paid; - right.cmp(&left) - }); - } else { - return Err(err_custom_create!( - "Unknown order_by option: {}", - payment_stats_options.order_by - )); + if scan_info.last_block > start_block { + log::info!("Start block from db is higher than start block from cli {}, using start block from db {}", start_block, scan_info.last_block); + start_block = scan_info.last_block; + } else if scan_info.last_block != -1 { + log::error!("There is old entry in db, remove it to start new scan or give proper block range: start block: {}, last block {}", start_block, scan_info.last_block); + return Err(err_custom_create!("There is old entry in db, remove it to start new scan or give proper block range: start block: {}, last block {}", start_block, scan_info.last_block)); } - if payment_stats_options.order_by_dir == "asc" { - per_receiver.reverse(); + let mut end_block = + if let Some(max_block_range) = scan_blockchain_options.max_block_range { + start_block + max_block_range as i64 + } else { + current_block + }; + + if let Some(blocks_behind) = scan_blockchain_options.blocks_behind { + if end_block > current_block - blocks_behind as i64 { + log::info!("End block {} is too close to current block {}, using current block - blocks_behind: {}", end_block, current_block, current_block - blocks_behind as i64); + end_block = current_block - blocks_behind as i64; + } } - for (el_no, receiver) in per_receiver.iter().enumerate() { - if el_no >= payment_stats_options.show_receiver_count { - println!("... and more (max {} receivers shown)", el_no); - break; + let txs = import_erc20_txs( + web3, + chain_cfg.token.clone().unwrap().address, + chain_cfg.chain_id, + Some(&[sender]), + None, + start_block, + end_block, + scan_blockchain_options.blocks_at_once, + ) + .await + .unwrap(); + + let mut max_block_from_tx = None; + for tx in &txs { + match transaction_from_chain_and_into_db( + web3, + &conn, + chain_cfg.chain_id, + &format!("{tx:#x}"), + ) + .await + { + Ok(Some(chain_tx)) => { + if chain_tx.block_number > max_block_from_tx.unwrap_or(0) { + max_block_from_tx = Some(chain_tx.block_number); + } + } + Ok(None) => {} + Err(e) => { + log::error!("Error when getting transaction from chain: {}", e); + continue; + } } - let ts = match receiver.1.erc20_token_transferred.iter().next() { - None => U256::zero(), - Some(x) => *x.1, - }; + } - println!( - "Receiver: {:#x}\n count (payment/web3): {}/{}, gas: {}, native token sent: {}, token sent: {}", - receiver.0, - receiver.1.done_count, - receiver.1.transaction_ids.len(), - u256_to_rust_dec(receiver.1.fee_paid, None).unwrap(), - u256_to_rust_dec(receiver.1.native_token_transferred, None).unwrap(), - u256_to_rust_dec( - ts, - None - ) - .unwrap(), - ); - println!( - " First transfer requested at {}", - receiver - .1 - .first_transfer_date - .map(|d| d.to_string()) - .unwrap_or("N/A".to_string()) - ); - println!( - " First payment made {}", - receiver - .1 - .first_paid_date - .map(|d| d.to_string()) - .unwrap_or("N/A".to_string()) - ); - println!( - " Last transfer requested at {}", - receiver - .1 - .last_transfer_date - .map(|d| d.to_string()) - .unwrap_or("N/A".to_string()) - ); - println!( - " Last payment made {}", - receiver - .1 - .last_paid_date - .map(|d| d.to_string()) - .unwrap_or("N/A".to_string()) - ); - println!( - " Max payment delay: {}", - receiver - .1 - .max_payment_delay - .map(|d| d.num_seconds().to_string() + "s") - .unwrap_or("N/A".to_string()) - ); + if scan_info.start_block == -1 { + scan_info.start_block = start_block; } + + //last blocks may be missing so we subtract 100 blocks from current to be sure + scan_info.last_block = std::cmp::min(end_block, current_block - 100); + log::info!( + "Updating db scan entry {} - {}", + scan_info.start_block, + scan_info.last_block + ); + upsert_scan_info(&conn, &scan_info) + .await + .map_err(err_from!())?; } PaymentCommands::ImportPayments { import_options } => { log::info!("importing payments from file: {}", import_options.file); diff --git a/src/options.rs b/src/options.rs index af8199ac..a6bf9726 100644 --- a/src/options.rs +++ b/src/options.rs @@ -107,9 +107,51 @@ pub struct ImportOptions { pub separator: char, } +#[derive(Debug, StructOpt)] +#[structopt(about = "Scan blockchain options")] +pub struct ScanBlockchainOptions { + #[structopt(short = "c", long = "chain-name", default_value = "polygon")] + pub chain_name: String, + + #[structopt(short = "b", long = "blocks-ago", default_value = "43200")] + pub from_blocks_ago: u64, + + #[structopt(long = "start-new-scan")] + pub start_new_scan: bool, + + #[structopt( + long = "max-block-range", + help = "Limit how much block to process from start" + )] + pub max_block_range: Option, + + #[structopt( + long = "blocks-behind", + help = "How much blocks behind scanner should stop" + )] + pub blocks_behind: Option, + + #[structopt( + long = "blocks-at-once", + default_value = "1000", + help = "Limit how much block to process at once. If too much web3 endpoint can return error" + )] + pub blocks_at_once: u64, + + #[structopt( + short = "a", + long = "address", + default_value = "0x09e4F0aE44D5E60D44A8928Af7531e6A862290bC" + )] + pub sender: String, +} + #[derive(StructOpt)] #[structopt(about = "Payment statistics options")] pub struct PaymentStatsOptions { + #[structopt(short = "c", long = "chain-name", default_value = "polygon")] + pub chain_name: String, + #[structopt( long = "receiver-count", help = "Number of receivers to show", @@ -132,6 +174,9 @@ pub struct PaymentStatsOptions { possible_values = &["asc", "desc"] )] pub order_by_dir: String, + + #[structopt(long = "from-blockchain", help = "Use data downloaded from blockchain")] + pub from_blockchain: bool, } #[derive(StructOpt)] @@ -182,6 +227,10 @@ pub enum PaymentCommands { #[structopt(flatten)] import_options: ImportOptions, }, + ScanBlockchain { + #[structopt(flatten)] + scan_blockchain_options: ScanBlockchainOptions, + }, PaymentStats { #[structopt(flatten)] payment_stats_options: PaymentStatsOptions, diff --git a/src/stats.rs b/src/stats.rs new file mode 100644 index 00000000..81cdde05 --- /dev/null +++ b/src/stats.rs @@ -0,0 +1,288 @@ +use erc20_payment_lib::config::Config; +use erc20_payment_lib::db::ops::{ + get_transfer_stats, get_transfer_stats_from_blockchain, TransferStatsPart, +}; +use erc20_payment_lib::err_custom_create; +use sqlx::SqlitePool; +use std::fs; +use web3::types::{H160, U256}; + +use crate::options::PaymentStatsOptions; +use erc20_payment_lib::error::PaymentError; +use erc20_payment_lib::utils::u256_to_rust_dec; + +pub async fn run_stats( + conn: SqlitePool, + payment_stats_options: PaymentStatsOptions, + config: &Config, +) -> Result<(), PaymentError> { + let chain_cfg = + config + .chain + .get(&payment_stats_options.chain_name) + .ok_or(err_custom_create!( + "Chain {} not found in config file", + payment_stats_options.chain_name + ))?; + + let mut metrics = String::new(); + + println!( + "Getting transfers stats for chain {}", + payment_stats_options.chain_name + ); + + let transfer_stats = if payment_stats_options.from_blockchain { + get_transfer_stats_from_blockchain(&conn, chain_cfg.chain_id, None) + .await + .unwrap() + } else { + get_transfer_stats(&conn, chain_cfg.chain_id, None) + .await + .unwrap() + }; + if transfer_stats.per_sender.is_empty() { + println!("No transfers found"); + return Ok(()); + } + let main_sender = transfer_stats.per_sender.iter().next().unwrap(); + let stats_all = main_sender.1.all.clone(); + let fee_paid_stats = stats_all.fee_paid; + println!( + "fee paid from stats: {}", + u256_to_rust_dec(fee_paid_stats, None).unwrap() + ); + + println!("Number of transfers done: {}", stats_all.done_count); + + println!( + "Number of distinct receivers: {}", + main_sender.1.per_receiver.len() + ); + + for (sender, stats) in &transfer_stats.per_sender { + metrics += &format!( + "{}\n{}\nreceivers_count{{chain_id=\"{}\", receiver=\"{:#x}\"}} {}\n", + "# HELP receivers_count Number of distinct receivers", + "# TYPE receivers_count counter", + chain_cfg.chain_id, + sender, + stats.per_receiver.len(), + ); + + let token_transferred = stats + .all + .erc20_token_transferred + .get(&chain_cfg.token.clone().unwrap().address) + .copied(); + + metrics += &format!( + "{}\n{}\nerc20_transferred{{chain_id=\"{}\", sender=\"{:#x}\"}} {}\n", + "# HELP erc20_transferred Number of distinct receivers", + "# TYPE erc20_transferred counter", + chain_cfg.chain_id, + sender, + u256_to_rust_dec(token_transferred.unwrap_or(U256::zero()), None).unwrap(), + ); + + metrics += &format!( + "{}\n{}\npayment_count{{chain_id=\"{}\", sender=\"{:#x}\"}} {}\n", + "# HELP payment_count Number of distinct payments", + "# TYPE payment_count counter", + chain_cfg.chain_id, + sender, + stats.all.done_count, + ); + + metrics += &format!( + "{}\n{}\ntransaction_count{{chain_id=\"{}\", sender=\"{:#x}\"}} {}\n", + "# HELP transaction_count Number of web3 transactions", + "# TYPE transaction_count counter", + chain_cfg.chain_id, + sender, + stats.all.transaction_ids.len(), + ); + + metrics += &format!( + "{}\n{}\nfee_paid{{chain_id=\"{}\", sender=\"{:#x}\"}} {}\n", + "# HELP fee_paid Total fee paid", + "# TYPE fee_paid counter", + chain_cfg.chain_id, + sender, + u256_to_rust_dec(stats.all.fee_paid, None).unwrap_or_default(), + ); + } + + println!( + "Number of web3 transactions: {}", + main_sender.1.all.transaction_ids.len() + ); + + println!( + "First transfer requested at {}", + main_sender + .1 + .all + .first_transfer_date + .map(|d| d.to_string()) + .unwrap_or("N/A".to_string()) + ); + println!( + "First payment made {}", + main_sender + .1 + .all + .first_paid_date + .map(|d| d.to_string()) + .unwrap_or("N/A".to_string()) + ); + println!( + "Last transfer requested at {}", + main_sender + .1 + .all + .last_transfer_date + .map(|d| d.to_string()) + .unwrap_or("N/A".to_string()) + ); + println!( + "Last payment made {}", + main_sender + .1 + .all + .last_paid_date + .map(|d| d.to_string()) + .unwrap_or("N/A".to_string()) + ); + println!( + "Max payment delay: {}", + main_sender + .1 + .all + .max_payment_delay + .map(|d| d.num_seconds().to_string() + "s") + .unwrap_or("N/A".to_string()) + ); + + println!( + "Native token sent: {}", + u256_to_rust_dec(main_sender.1.all.native_token_transferred, None).unwrap() + ); + let token_transferred = main_sender + .1 + .all + .erc20_token_transferred + .get(&chain_cfg.token.clone().unwrap().address) + .copied(); + println!( + "Erc20 token sent: {}", + u256_to_rust_dec(token_transferred.unwrap_or(U256::zero()), None).unwrap() + ); + + let per_receiver = main_sender.1.per_receiver.clone(); + let mut per_receiver: Vec<(H160, TransferStatsPart)> = per_receiver.into_iter().collect(); + if payment_stats_options.order_by == "payment_delay" { + per_receiver.sort_by(|r, b| { + let left = + r.1.max_payment_delay + .unwrap_or(chrono::Duration::max_value()); + let right = + b.1.max_payment_delay + .unwrap_or(chrono::Duration::max_value()); + right.cmp(&left) + }); + } else if payment_stats_options.order_by == "token_sent" { + per_receiver.sort_by(|r, b| { + let left = *r.1.erc20_token_transferred.iter().next().unwrap().1; + let right = *b.1.erc20_token_transferred.iter().next().unwrap().1; + right.cmp(&left) + }); + } else if payment_stats_options.order_by == "gas_paid" + || payment_stats_options.order_by == "fee_paid" + { + per_receiver.sort_by(|r, b| { + let left = r.1.fee_paid; + let right = b.1.fee_paid; + right.cmp(&left) + }); + } else { + return Err(err_custom_create!( + "Unknown order_by option: {}", + payment_stats_options.order_by + )); + } + + if payment_stats_options.order_by_dir == "asc" { + per_receiver.reverse(); + } + + for (el_no, receiver) in per_receiver.iter().enumerate() { + if el_no >= payment_stats_options.show_receiver_count { + println!("... and more (max {} receivers shown)", el_no); + break; + } + let ts = match receiver.1.erc20_token_transferred.iter().next() { + None => U256::zero(), + Some(x) => *x.1, + }; + + println!( + "Receiver: {:#x}\n count (payment/web3): {}/{}, gas: {}, native token sent: {}, token sent: {}", + receiver.0, + receiver.1.done_count, + receiver.1.transaction_ids.len(), + u256_to_rust_dec(receiver.1.fee_paid, None).unwrap(), + u256_to_rust_dec(receiver.1.native_token_transferred, None).unwrap(), + u256_to_rust_dec( + ts, + None + ) + .unwrap(), + ); + println!( + " First transfer requested at {}", + receiver + .1 + .first_transfer_date + .map(|d| d.to_string()) + .unwrap_or("N/A".to_string()) + ); + println!( + " First payment made {}", + receiver + .1 + .first_paid_date + .map(|d| d.to_string()) + .unwrap_or("N/A".to_string()) + ); + println!( + " Last transfer requested at {}", + receiver + .1 + .last_transfer_date + .map(|d| d.to_string()) + .unwrap_or("N/A".to_string()) + ); + println!( + " Last payment made {}", + receiver + .1 + .last_paid_date + .map(|d| d.to_string()) + .unwrap_or("N/A".to_string()) + ); + println!( + " Max payment delay: {}", + receiver + .1 + .max_payment_delay + .map(|d| d.num_seconds().to_string() + "s") + .unwrap_or("N/A".to_string()) + ); + } + + //write metrics to prometheus/metrics.txt + fs::write("prometheus/metrics.txt", metrics).expect("Unable to write file"); + + Ok(()) +}