Skip to content

Commit

Permalink
refactor(backfiller): remove using tree transactions because similar …
Browse files Browse the repository at this point in the history
…info tracked by cl_items and cl_audits (#119)
  • Loading branch information
kespinola authored Dec 20, 2023
1 parent 5b6a17f commit cb271ca
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 133 deletions.
77 changes: 12 additions & 65 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::IngesterError;
use digital_asset_types::dao::{
asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, tree_transactions,
asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items,
};
use log::{debug, error, info};
use mpl_bubblegum::types::Collection;
Expand All @@ -13,69 +13,6 @@ use spl_account_compression::events::ChangeLogEventV1;

use std::convert::From;

/// Mark tree transaction as processed. If the transaction already exists, update the `processed_at` field.
///
/// This function takes in a tree ID, slot, transaction ID, and a transaction object.
/// It first checks if a tree transaction with the given transaction ID already exists.
/// If it does, it updates the `processed_at` field of the existing tree transaction with the current time.
/// If it doesn't, it creates a new tree transaction with the provided parameters and saves it.
///
/// # Arguments
///
/// * `tree_id` - A vector of bytes representing the ID of the tree.
/// * `slot` - A 64-bit unsigned integer representing the slot.
/// * `txn_id` - A string slice representing the transaction ID.
/// * `txn` - A reference to a transaction object.
///
/// # Returns
///
/// This function returns a `Result` that contains an empty tuple, or an `IngesterError` if the operation fails.
pub async fn save_tree_transaction<'c, T>(
tree_id: Vec<u8>,
slot: u64,
txn_id: &str,
txn: &T,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
let now = chrono::Utc::now()
.with_timezone(&chrono::FixedOffset::east_opt(0).ok_or(IngesterError::ChronoFixedOffset)?);

let tree_transaction = tree_transactions::Entity::find()
.filter(tree_transactions::Column::Signature.eq(txn_id))
.one(txn)
.await?;

if let Some(tree_transaction) = tree_transaction {
let mut tree_transaction: tree_transactions::ActiveModel = tree_transaction.into();

tree_transaction.processed_at = Set(Some(now));

tree_transaction.save(txn).await?;
} else {
let tree = Pubkey::try_from(tree_id).map_err(|_| IngesterError::ParsePubkey)?;

let tree_transaction = tree_transactions::ActiveModel {
signature: Set(txn_id.to_string()),
slot: Set(i64::try_from(slot)?),
tree: Set(tree.to_string()),
processed_at: Set(Some(now)),
..Default::default()
};

tree_transactions::Entity::insert(tree_transaction)
.on_conflict(
OnConflict::column(tree_transactions::Column::Signature)
.do_nothing()
.to_owned(),
)
.exec(txn)
.await?;
}
Ok(())
}

pub async fn save_changelog_event<'c, T>(
change_log_event: &ChangeLogEventV1,
slot: u64,
Expand Down Expand Up @@ -159,7 +96,17 @@ where
.map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?;

// Insert the audit item after the insert into cl_items have been completed
let query = cl_audits::Entity::insert(audit_item).build(DbBackend::Postgres);
let query = cl_audits::Entity::insert(audit_item)
.on_conflict(
OnConflict::columns([
cl_audits::Column::Tree,
cl_audits::Column::Seq,
cl_audits::Column::NodeIdx,
])
.do_nothing()
.to_owned(),
)
.build(DbBackend::Postgres);
match txn.execute(query).await {
Ok(_) => {}
Err(e) => {
Expand Down
10 changes: 0 additions & 10 deletions nft_ingester/src/program_transformers/bubblegum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,6 @@ where
}
_ => debug!("Bubblegum: Not Implemented Instruction"),
}
// TODO: assuming tree update available on all transactions but need to confirm.
if let Some(tree_update) = &parsing_result.tree_update {
save_tree_transaction(
tree_update.id.to_bytes().to_vec(),
bundle.slot,
bundle.txn_id,
txn,
)
.await?;
}

Ok(())
}
Expand Down
54 changes: 21 additions & 33 deletions tree_backfiller/src/backfiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};

use anyhow::Result;
use clap::Parser;
use clap::{Parser, ValueEnum};
use digital_asset_types::dao::tree_transactions;
use indicatif::HumanDuration;
use log::{error, info};
Expand All @@ -21,6 +21,12 @@ use std::time::Duration;
use std::time::Instant;
use tokio::sync::{mpsc, Semaphore};

#[derive(Debug, Parser, Clone, ValueEnum, PartialEq, Eq)]
pub enum CrawlDirection {
Forward,
Backward,
}

#[derive(Debug, Parser, Clone)]
pub struct Args {
/// Number of tree crawler workers
Expand All @@ -37,6 +43,9 @@ pub struct Args {
#[arg(long, env, use_value_delimiter = true)]
pub only_trees: Option<Vec<String>>,

#[arg(long, env, default_value = "forward")]
pub crawl_direction: CrawlDirection,

/// Database configuration
#[clap(flatten)]
pub database: db::PoolArgs,
Expand Down Expand Up @@ -120,15 +129,11 @@ pub async fn run(config: Args) -> Result<()> {
let solana_rpc = Rpc::from_config(config.solana);
let transaction_solana_rpc = solana_rpc.clone();

let pool = db::connect(config.database).await?;
let transaction_pool = pool.clone();

let metrics = Metrics::try_from_config(config.metrics)?;
let tree_metrics = metrics.clone();
let transaction_metrics = metrics.clone();

let (sig_sender, mut sig_receiver) =
mpsc::channel::<tree_transactions::ActiveModel>(config.signature_channel_size);
let (sig_sender, mut sig_receiver) = mpsc::channel::<Signature>(config.signature_channel_size);

let transaction_count = Counter::new();
let transaction_worker_transaction_count = transaction_count.clone();
Expand All @@ -138,12 +143,11 @@ pub async fn run(config: Args) -> Result<()> {
tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(config.transaction_worker_count));

while let Some(tree_transaction) = sig_receiver.recv().await {
while let Some(signature) = sig_receiver.recv().await {
let solana_rpc = transaction_solana_rpc.clone();
let metrics = transaction_metrics.clone();
let queue = queue.clone();
let semaphore = semaphore.clone();
let pool = transaction_pool.clone();
let count = transaction_worker_transaction_count.clone();

count.increment();
Expand All @@ -152,30 +156,16 @@ pub async fn run(config: Args) -> Result<()> {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

let inserted_tree_transaction = tree_transactions::Entity::insert(tree_transaction)
.on_conflict(
OnConflict::column(tree_transactions::Column::Signature)
.do_nothing()
.to_owned(),
)
.exec_with_returning(&conn)
.await;

if let Ok(tree_transaction) = inserted_tree_transaction {
let signature = Signature::from_str(&tree_transaction.signature)?;

if let Err(e) = tree::transaction(&solana_rpc, queue, signature).await {
error!("tree transaction: {:?}", e);
metrics.increment("transaction.failed");
} else {
metrics.increment("transaction.succeeded");
}

metrics.time("transaction.queued", timing.elapsed());

if let Err(e) = tree::transaction(&solana_rpc, queue, signature).await {
error!("tree transaction: {:?}", e);
metrics.increment("transaction.failed");
} else {
metrics.increment("transaction.succeeded");
}

metrics.time("transaction.queued", timing.elapsed());

count.decrement();

Ok::<(), anyhow::Error>(())
Expand Down Expand Up @@ -207,16 +197,14 @@ pub async fn run(config: Args) -> Result<()> {
let client = solana_rpc.clone();
let semaphore = semaphore.clone();
let sig_sender = sig_sender.clone();
let pool = pool.clone();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let metrics = tree_metrics.clone();

let crawl_handle = tokio::spawn(async move {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();

if let Err(e) = tree.crawl(&client, sig_sender, conn).await {
if let Err(e) = tree.crawl(&client, sig_sender).await {
metrics.increment("tree.failed");
error!("crawling tree: {:?}", e);
} else {
Expand Down
2 changes: 0 additions & 2 deletions tree_backfiller/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ impl Rpc {
&self,
pubkey: &Pubkey,
before: Option<Signature>,
until: Option<Signature>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>, ClientError> {
(|| async {
self.0
.get_signatures_for_address_with_config(
pubkey,
GetConfirmedSignaturesForAddress2Config {
before,
until,
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
Expand Down
27 changes: 4 additions & 23 deletions tree_backfiller/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::str::FromStr;
use thiserror::Error as ThisError;
use tokio::sync::mpsc::Sender;

use crate::backfiller::CrawlDirection;
use crate::{
queue::{QueuePool, QueuePoolError},
rpc::Rpc,
Expand Down Expand Up @@ -94,38 +95,18 @@ impl TreeResponse {
tree_header,
})
}
pub async fn crawl(
&self,
client: &Rpc,
sender: Sender<tree_transactions::ActiveModel>,
conn: DatabaseConnection,
) -> Result<()> {
pub async fn crawl(&self, client: &Rpc, sender: Sender<Signature>) -> Result<()> {
let mut before = None;

let until = tree_transactions::Entity::find()
.filter(tree_transactions::Column::Tree.eq(self.pubkey.to_string()))
.order_by_desc(tree_transactions::Column::Slot)
.one(&conn)
.await?
.and_then(|t| Signature::from_str(&t.signature).ok());

loop {
let sigs = client
.get_signatures_for_address(&self.pubkey, before, until)
.get_signatures_for_address(&self.pubkey, before)
.await?;

for sig in sigs.iter() {
let slot = i64::try_from(sig.slot)?;
let sig = Signature::from_str(&sig.signature)?;

let tree_transaction = tree_transactions::ActiveModel {
signature: Set(sig.to_string()),
tree: Set(self.pubkey.to_string()),
slot: Set(slot),
..Default::default()
};

sender.send(tree_transaction).await?;
sender.send(sig).await?;

before = Some(sig);
}
Expand Down

0 comments on commit cb271ca

Please sign in to comment.