From 51bc58f90f0e07e8f84157ff2e0268b97191fc65 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Wed, 20 Dec 2023 13:34:25 +0100 Subject: [PATCH] refactor(backfiller): remove using tree transactions because similar info tracked by cl_items and cl_audits --- .../src/program_transformers/bubblegum/db.rs | 77 +++---------------- .../src/program_transformers/bubblegum/mod.rs | 10 --- tree_backfiller/src/backfiller.rs | 54 +++++-------- tree_backfiller/src/rpc.rs | 2 - tree_backfiller/src/tree.rs | 27 +------ 5 files changed, 37 insertions(+), 133 deletions(-) diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index ef0aff0f5..49398f3d2 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,6 +1,6 @@ use crate::error::IngesterError; use digital_asset_types::dao::{ - asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, tree_transactions, + asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, }; use log::{debug, error, info}; use mpl_bubblegum::types::Collection; @@ -13,69 +13,6 @@ use spl_account_compression::events::ChangeLogEventV1; use std::convert::From; -/// Mark tree transaction as processed. If the transaction already exists, update the `processed_at` field. -/// -/// This function takes in a tree ID, slot, transaction ID, and a transaction object. -/// It first checks if a tree transaction with the given transaction ID already exists. -/// If it does, it updates the `processed_at` field of the existing tree transaction with the current time. -/// If it doesn't, it creates a new tree transaction with the provided parameters and saves it. -/// -/// # Arguments -/// -/// * `tree_id` - A vector of bytes representing the ID of the tree. -/// * `slot` - A 64-bit unsigned integer representing the slot. -/// * `txn_id` - A string slice representing the transaction ID. -/// * `txn` - A reference to a transaction object. -/// -/// # Returns -/// -/// This function returns a `Result` that contains an empty tuple, or an `IngesterError` if the operation fails. -pub async fn save_tree_transaction<'c, T>( - tree_id: Vec, - slot: u64, - txn_id: &str, - txn: &T, -) -> Result<(), IngesterError> -where - T: ConnectionTrait + TransactionTrait, -{ - let now = chrono::Utc::now() - .with_timezone(&chrono::FixedOffset::east_opt(0).ok_or(IngesterError::ChronoFixedOffset)?); - - let tree_transaction = tree_transactions::Entity::find() - .filter(tree_transactions::Column::Signature.eq(txn_id)) - .one(txn) - .await?; - - if let Some(tree_transaction) = tree_transaction { - let mut tree_transaction: tree_transactions::ActiveModel = tree_transaction.into(); - - tree_transaction.processed_at = Set(Some(now)); - - tree_transaction.save(txn).await?; - } else { - let tree = Pubkey::try_from(tree_id).map_err(|_| IngesterError::ParsePubkey)?; - - let tree_transaction = tree_transactions::ActiveModel { - signature: Set(txn_id.to_string()), - slot: Set(i64::try_from(slot)?), - tree: Set(tree.to_string()), - processed_at: Set(Some(now)), - ..Default::default() - }; - - tree_transactions::Entity::insert(tree_transaction) - .on_conflict( - OnConflict::column(tree_transactions::Column::Signature) - .do_nothing() - .to_owned(), - ) - .exec(txn) - .await?; - } - Ok(()) -} - pub async fn save_changelog_event<'c, T>( change_log_event: &ChangeLogEventV1, slot: u64, @@ -159,7 +96,17 @@ where .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; // Insert the audit item after the insert into cl_items have been completed - let query = cl_audits::Entity::insert(audit_item).build(DbBackend::Postgres); + let query = cl_audits::Entity::insert(audit_item) + .on_conflict( + OnConflict::columns([ + cl_audits::Column::Tree, + cl_audits::Column::Seq, + cl_audits::Column::NodeIdx, + ]) + .do_nothing() + .to_owned(), + ) + .build(DbBackend::Postgres); match txn.execute(query).await { Ok(_) => {} Err(e) => { diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index e73c90040..a59d88f0c 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -99,16 +99,6 @@ where } _ => debug!("Bubblegum: Not Implemented Instruction"), } - // TODO: assuming tree update available on all transactions but need to confirm. - if let Some(tree_update) = &parsing_result.tree_update { - save_tree_transaction( - tree_update.id.to_bytes().to_vec(), - bundle.slot, - bundle.txn_id, - txn, - ) - .await?; - } Ok(()) } diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs index 6a9330dae..30c39f27a 100644 --- a/tree_backfiller/src/backfiller.rs +++ b/tree_backfiller/src/backfiller.rs @@ -7,7 +7,7 @@ use crate::{ }; use anyhow::Result; -use clap::Parser; +use clap::{Parser, ValueEnum}; use digital_asset_types::dao::tree_transactions; use indicatif::HumanDuration; use log::{error, info}; @@ -21,6 +21,12 @@ use std::time::Duration; use std::time::Instant; use tokio::sync::{mpsc, Semaphore}; +#[derive(Debug, Parser, Clone, ValueEnum, PartialEq, Eq)] +pub enum CrawlDirection { + Forward, + Backward, +} + #[derive(Debug, Parser, Clone)] pub struct Args { /// Number of tree crawler workers @@ -37,6 +43,9 @@ pub struct Args { #[arg(long, env, use_value_delimiter = true)] pub only_trees: Option>, + #[arg(long, env, default_value = "forward")] + pub crawl_direction: CrawlDirection, + /// Database configuration #[clap(flatten)] pub database: db::PoolArgs, @@ -120,15 +129,11 @@ pub async fn run(config: Args) -> Result<()> { let solana_rpc = Rpc::from_config(config.solana); let transaction_solana_rpc = solana_rpc.clone(); - let pool = db::connect(config.database).await?; - let transaction_pool = pool.clone(); - let metrics = Metrics::try_from_config(config.metrics)?; let tree_metrics = metrics.clone(); let transaction_metrics = metrics.clone(); - let (sig_sender, mut sig_receiver) = - mpsc::channel::(config.signature_channel_size); + let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); let transaction_count = Counter::new(); let transaction_worker_transaction_count = transaction_count.clone(); @@ -138,12 +143,11 @@ pub async fn run(config: Args) -> Result<()> { tokio::spawn(async move { let semaphore = Arc::new(Semaphore::new(config.transaction_worker_count)); - while let Some(tree_transaction) = sig_receiver.recv().await { + while let Some(signature) = sig_receiver.recv().await { let solana_rpc = transaction_solana_rpc.clone(); let metrics = transaction_metrics.clone(); let queue = queue.clone(); let semaphore = semaphore.clone(); - let pool = transaction_pool.clone(); let count = transaction_worker_transaction_count.clone(); count.increment(); @@ -152,30 +156,16 @@ pub async fn run(config: Args) -> Result<()> { let _permit = semaphore.acquire().await?; let timing = Instant::now(); - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - - let inserted_tree_transaction = tree_transactions::Entity::insert(tree_transaction) - .on_conflict( - OnConflict::column(tree_transactions::Column::Signature) - .do_nothing() - .to_owned(), - ) - .exec_with_returning(&conn) - .await; - - if let Ok(tree_transaction) = inserted_tree_transaction { - let signature = Signature::from_str(&tree_transaction.signature)?; - - if let Err(e) = tree::transaction(&solana_rpc, queue, signature).await { - error!("tree transaction: {:?}", e); - metrics.increment("transaction.failed"); - } else { - metrics.increment("transaction.succeeded"); - } - - metrics.time("transaction.queued", timing.elapsed()); + + if let Err(e) = tree::transaction(&solana_rpc, queue, signature).await { + error!("tree transaction: {:?}", e); + metrics.increment("transaction.failed"); + } else { + metrics.increment("transaction.succeeded"); } + metrics.time("transaction.queued", timing.elapsed()); + count.decrement(); Ok::<(), anyhow::Error>(()) @@ -207,8 +197,6 @@ pub async fn run(config: Args) -> Result<()> { let client = solana_rpc.clone(); let semaphore = semaphore.clone(); let sig_sender = sig_sender.clone(); - let pool = pool.clone(); - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); let metrics = tree_metrics.clone(); let crawl_handle = tokio::spawn(async move { @@ -216,7 +204,7 @@ pub async fn run(config: Args) -> Result<()> { let timing = Instant::now(); - if let Err(e) = tree.crawl(&client, sig_sender, conn).await { + if let Err(e) = tree.crawl(&client, sig_sender).await { metrics.increment("tree.failed"); error!("crawling tree: {:?}", e); } else { diff --git a/tree_backfiller/src/rpc.rs b/tree_backfiller/src/rpc.rs index 361d88e17..4d9e976b3 100644 --- a/tree_backfiller/src/rpc.rs +++ b/tree_backfiller/src/rpc.rs @@ -62,7 +62,6 @@ impl Rpc { &self, pubkey: &Pubkey, before: Option, - until: Option, ) -> Result, ClientError> { (|| async { self.0 @@ -70,7 +69,6 @@ impl Rpc { pubkey, GetConfirmedSignaturesForAddress2Config { before, - until, commitment: Some(CommitmentConfig { commitment: CommitmentLevel::Finalized, }), diff --git a/tree_backfiller/src/tree.rs b/tree_backfiller/src/tree.rs index 5171f1ee3..50c9527cf 100644 --- a/tree_backfiller/src/tree.rs +++ b/tree_backfiller/src/tree.rs @@ -18,6 +18,7 @@ use std::str::FromStr; use thiserror::Error as ThisError; use tokio::sync::mpsc::Sender; +use crate::backfiller::CrawlDirection; use crate::{ queue::{QueuePool, QueuePoolError}, rpc::Rpc, @@ -94,38 +95,18 @@ impl TreeResponse { tree_header, }) } - pub async fn crawl( - &self, - client: &Rpc, - sender: Sender, - conn: DatabaseConnection, - ) -> Result<()> { + pub async fn crawl(&self, client: &Rpc, sender: Sender) -> Result<()> { let mut before = None; - let until = tree_transactions::Entity::find() - .filter(tree_transactions::Column::Tree.eq(self.pubkey.to_string())) - .order_by_desc(tree_transactions::Column::Slot) - .one(&conn) - .await? - .and_then(|t| Signature::from_str(&t.signature).ok()); - loop { let sigs = client - .get_signatures_for_address(&self.pubkey, before, until) + .get_signatures_for_address(&self.pubkey, before) .await?; for sig in sigs.iter() { - let slot = i64::try_from(sig.slot)?; let sig = Signature::from_str(&sig.signature)?; - let tree_transaction = tree_transactions::ActiveModel { - signature: Set(sig.to_string()), - tree: Set(self.pubkey.to_string()), - slot: Set(slot), - ..Default::default() - }; - - sender.send(tree_transaction).await?; + sender.send(sig).await?; before = Some(sig); }