From 09430573e0dc5c5bcd3ab7ec9173e4f3c0235892 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Fri, 23 Feb 2024 12:05:45 +0100 Subject: [PATCH] fix(ops): backfiller was not filtering out failed transactions --- ops/src/bubblegum/audit.rs | 91 ++++++++++++++++++++++++++++++++++++++ ops/src/bubblegum/cmd.rs | 8 +++- ops/src/bubblegum/mod.rs | 1 + ops/src/bubblegum/tree.rs | 11 ++++- 4 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 ops/src/bubblegum/audit.rs diff --git a/ops/src/bubblegum/audit.rs b/ops/src/bubblegum/audit.rs new file mode 100644 index 000000000..671aceed0 --- /dev/null +++ b/ops/src/bubblegum/audit.rs @@ -0,0 +1,91 @@ +use super::rpc::{Rpc, SolanaRpcArgs}; +use anyhow::Result; + +use clap::Parser; +use das_core::{connect_db, MetricsArgs, PoolArgs}; +use digital_asset_types::dao::cl_audits_v2; +use futures::future; + +use sea_orm::{CursorTrait, EntityTrait, SqlxPostgresConnector}; +use solana_sdk::signature::Signature; +use solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta; + +use tokio::io::{stdout, AsyncWriteExt}; + +#[derive(Debug, Parser, Clone)] +pub struct Args { + /// Database configuration + #[clap(flatten)] + pub database: PoolArgs, + + /// Metrics configuration + #[clap(flatten)] + pub metrics: MetricsArgs, + + /// Solana configuration + #[clap(flatten)] + pub solana: SolanaRpcArgs, + + #[arg(long, env, default_value = "10000")] + pub batch_size: u64, +} + +pub async fn run(config: Args) -> Result<()> { + let pool = connect_db(config.database).await?; + + let solana_rpc = Rpc::from_config(config.solana); + + let mut output = stdout(); + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + let mut after = None; + + loop { + let mut query = cl_audits_v2::Entity::find().cursor_by(cl_audits_v2::Column::Id); + let mut query = query.first(config.batch_size); + + if let Some(after) = after { + query = query.after(after); + } + + let entries = query.all(&conn).await?; + + let mut transactions = vec![]; + + for entry in entries.clone() { + transactions.push(fetch_transaction(entry, solana_rpc.clone())); + } + + let transactions = future::join_all(transactions).await; + + for (signature, transaction) in transactions.into_iter().flatten() { + if let Some(meta) = transaction.transaction.meta { + if meta.err.is_some() { + output + .write_all(format!("{}\n", signature).as_bytes()) + .await?; + + output.flush().await?; + } + } + } + + after = entries.last().map(|cl_audit_v2| cl_audit_v2.id); + + if entries.is_empty() { + break; + } + } + + Ok(()) +} + +async fn fetch_transaction( + entry: cl_audits_v2::Model, + solana_rpc: Rpc, +) -> Result<(Signature, EncodedConfirmedTransactionWithStatusMeta)> { + let signature = Signature::try_from(entry.tx.as_ref())?; + + let transaction = solana_rpc.get_transaction(&signature).await?; + + Ok((signature, transaction)) +} diff --git a/ops/src/bubblegum/cmd.rs b/ops/src/bubblegum/cmd.rs index 1004a227d..bb2244167 100644 --- a/ops/src/bubblegum/cmd.rs +++ b/ops/src/bubblegum/cmd.rs @@ -1,4 +1,4 @@ -use super::backfiller; +use super::{audit, backfiller}; use anyhow::Result; use clap::{Args, Subcommand}; @@ -8,6 +8,9 @@ pub enum Commands { /// It crawls through trees and backfills any missed tree transactions. #[clap(name = "backfill")] Backfill(backfiller::Args), + /// The `audit` commands checks `cl_audits_v2` for any failed transactions and logs them to stdout. + #[clap(name = "audit")] + Audit(audit::Args), } #[derive(Debug, Clone, Args)] @@ -21,6 +24,9 @@ pub async fn subcommand(subcommand: BubblegumCommand) -> Result<()> { Commands::Backfill(args) => { backfiller::run(args).await?; } + Commands::Audit(args) => { + audit::run(args).await?; + } } Ok(()) diff --git a/ops/src/bubblegum/mod.rs b/ops/src/bubblegum/mod.rs index f86724b79..a25bc4784 100644 --- a/ops/src/bubblegum/mod.rs +++ b/ops/src/bubblegum/mod.rs @@ -1,3 +1,4 @@ +mod audit; mod backfiller; mod cmd; mod queue; diff --git a/ops/src/bubblegum/tree.rs b/ops/src/bubblegum/tree.rs index 358477c9d..260252907 100644 --- a/ops/src/bubblegum/tree.rs +++ b/ops/src/bubblegum/tree.rs @@ -4,6 +4,7 @@ use clap::Args; use log::error; use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement, Value}; use solana_client::rpc_filter::{Memcmp, RpcFilterType}; +use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature}; use spl_account_compression::id; use spl_account_compression::state::{ @@ -146,8 +147,14 @@ impl TreeGapFill { let sigs = client .get_signatures_for_address(&self.tree, before, self.until) .await?; + let sig_count = sigs.len(); - for sig in sigs.iter() { + let successful_transactions = sigs + .into_iter() + .filter(|transaction| transaction.err.is_none()) + .collect::>(); + + for sig in successful_transactions.iter() { let sig = Signature::from_str(&sig.signature)?; sender.send(sig).await?; @@ -155,7 +162,7 @@ impl TreeGapFill { before = Some(sig); } - if sigs.len() < GET_SIGNATURES_FOR_ADDRESS_LIMIT { + if sig_count < GET_SIGNATURES_FOR_ADDRESS_LIMIT { break; } }