Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove use of tree_transactions from the backfiller #119

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 12 additions & 65 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::IngesterError;
use digital_asset_types::dao::{
asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, tree_transactions,
asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items,
};
use log::{debug, error, info};
use mpl_bubblegum::types::Collection;
Expand All @@ -13,69 +13,6 @@ use spl_account_compression::events::ChangeLogEventV1;

use std::convert::From;

/// Mark tree transaction as processed. If the transaction already exists, update the `processed_at` field.
///
/// This function takes in a tree ID, slot, transaction ID, and a transaction object.
/// It first checks if a tree transaction with the given transaction ID already exists.
/// If it does, it updates the `processed_at` field of the existing tree transaction with the current time.
/// If it doesn't, it creates a new tree transaction with the provided parameters and saves it.
///
/// # Arguments
///
/// * `tree_id` - A vector of bytes representing the ID of the tree.
/// * `slot` - A 64-bit unsigned integer representing the slot.
/// * `txn_id` - A string slice representing the transaction ID.
/// * `txn` - A reference to a transaction object.
///
/// # Returns
///
/// This function returns a `Result` that contains an empty tuple, or an `IngesterError` if the operation fails.
pub async fn save_tree_transaction<'c, T>(
tree_id: Vec<u8>,
slot: u64,
txn_id: &str,
txn: &T,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
let now = chrono::Utc::now()
.with_timezone(&chrono::FixedOffset::east_opt(0).ok_or(IngesterError::ChronoFixedOffset)?);

let tree_transaction = tree_transactions::Entity::find()
.filter(tree_transactions::Column::Signature.eq(txn_id))
.one(txn)
.await?;

if let Some(tree_transaction) = tree_transaction {
let mut tree_transaction: tree_transactions::ActiveModel = tree_transaction.into();

tree_transaction.processed_at = Set(Some(now));

tree_transaction.save(txn).await?;
} else {
let tree = Pubkey::try_from(tree_id).map_err(|_| IngesterError::ParsePubkey)?;

let tree_transaction = tree_transactions::ActiveModel {
signature: Set(txn_id.to_string()),
slot: Set(i64::try_from(slot)?),
tree: Set(tree.to_string()),
processed_at: Set(Some(now)),
..Default::default()
};

tree_transactions::Entity::insert(tree_transaction)
.on_conflict(
OnConflict::column(tree_transactions::Column::Signature)
.do_nothing()
.to_owned(),
)
.exec(txn)
.await?;
}
Ok(())
}

pub async fn save_changelog_event<'c, T>(
change_log_event: &ChangeLogEventV1,
slot: u64,
Expand Down Expand Up @@ -159,7 +96,17 @@ where
.map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?;

// Insert the audit item after the insert into cl_items have been completed
let query = cl_audits::Entity::insert(audit_item).build(DbBackend::Postgres);
let query = cl_audits::Entity::insert(audit_item)
.on_conflict(
OnConflict::columns([
cl_audits::Column::Tree,
cl_audits::Column::Seq,
cl_audits::Column::NodeIdx,
])
.do_nothing()
.to_owned(),
)
.build(DbBackend::Postgres);
match txn.execute(query).await {
Ok(_) => {}
Err(e) => {
Expand Down
10 changes: 0 additions & 10 deletions nft_ingester/src/program_transformers/bubblegum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,6 @@ where
}
_ => debug!("Bubblegum: Not Implemented Instruction"),
}
// TODO: assuming tree update available on all transactions but need to confirm.
if let Some(tree_update) = &parsing_result.tree_update {
save_tree_transaction(
tree_update.id.to_bytes().to_vec(),
bundle.slot,
bundle.txn_id,
txn,
)
.await?;
}

Ok(())
}
Expand Down
54 changes: 21 additions & 33 deletions tree_backfiller/src/backfiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};

use anyhow::Result;
use clap::Parser;
use clap::{Parser, ValueEnum};
use digital_asset_types::dao::tree_transactions;
use indicatif::HumanDuration;
use log::{error, info};
Expand All @@ -21,6 +21,12 @@ use std::time::Duration;
use std::time::Instant;
use tokio::sync::{mpsc, Semaphore};

#[derive(Debug, Parser, Clone, ValueEnum, PartialEq, Eq)]
pub enum CrawlDirection {
Forward,
Backward,
}

#[derive(Debug, Parser, Clone)]
pub struct Args {
/// Number of tree crawler workers
Expand All @@ -37,6 +43,9 @@ pub struct Args {
#[arg(long, env, use_value_delimiter = true)]
pub only_trees: Option<Vec<String>>,

#[arg(long, env, default_value = "forward")]
pub crawl_direction: CrawlDirection,

/// Database configuration
#[clap(flatten)]
pub database: db::PoolArgs,
Expand Down Expand Up @@ -120,15 +129,11 @@ pub async fn run(config: Args) -> Result<()> {
let solana_rpc = Rpc::from_config(config.solana);
let transaction_solana_rpc = solana_rpc.clone();

let pool = db::connect(config.database).await?;
let transaction_pool = pool.clone();

let metrics = Metrics::try_from_config(config.metrics)?;
let tree_metrics = metrics.clone();
let transaction_metrics = metrics.clone();

let (sig_sender, mut sig_receiver) =
mpsc::channel::<tree_transactions::ActiveModel>(config.signature_channel_size);
let (sig_sender, mut sig_receiver) = mpsc::channel::<Signature>(config.signature_channel_size);

let transaction_count = Counter::new();
let transaction_worker_transaction_count = transaction_count.clone();
Expand All @@ -138,12 +143,11 @@ pub async fn run(config: Args) -> Result<()> {
tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(config.transaction_worker_count));

while let Some(tree_transaction) = sig_receiver.recv().await {
while let Some(signature) = sig_receiver.recv().await {
let solana_rpc = transaction_solana_rpc.clone();
let metrics = transaction_metrics.clone();
let queue = queue.clone();
let semaphore = semaphore.clone();
let pool = transaction_pool.clone();
let count = transaction_worker_transaction_count.clone();

count.increment();
Expand All @@ -152,30 +156,16 @@ pub async fn run(config: Args) -> Result<()> {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

let inserted_tree_transaction = tree_transactions::Entity::insert(tree_transaction)
.on_conflict(
OnConflict::column(tree_transactions::Column::Signature)
.do_nothing()
.to_owned(),
)
.exec_with_returning(&conn)
.await;

if let Ok(tree_transaction) = inserted_tree_transaction {
let signature = Signature::from_str(&tree_transaction.signature)?;

if let Err(e) = tree::transaction(&solana_rpc, queue, signature).await {
error!("tree transaction: {:?}", e);
metrics.increment("transaction.failed");
} else {
metrics.increment("transaction.succeeded");
}

metrics.time("transaction.queued", timing.elapsed());

if let Err(e) = tree::transaction(&solana_rpc, queue, signature).await {
error!("tree transaction: {:?}", e);
metrics.increment("transaction.failed");
} else {
metrics.increment("transaction.succeeded");
}

metrics.time("transaction.queued", timing.elapsed());

count.decrement();

Ok::<(), anyhow::Error>(())
Expand Down Expand Up @@ -207,16 +197,14 @@ pub async fn run(config: Args) -> Result<()> {
let client = solana_rpc.clone();
let semaphore = semaphore.clone();
let sig_sender = sig_sender.clone();
let pool = pool.clone();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let metrics = tree_metrics.clone();

let crawl_handle = tokio::spawn(async move {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();

if let Err(e) = tree.crawl(&client, sig_sender, conn).await {
if let Err(e) = tree.crawl(&client, sig_sender).await {
metrics.increment("tree.failed");
error!("crawling tree: {:?}", e);
} else {
Expand Down
2 changes: 0 additions & 2 deletions tree_backfiller/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ impl Rpc {
&self,
pubkey: &Pubkey,
before: Option<Signature>,
until: Option<Signature>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>, ClientError> {
(|| async {
self.0
.get_signatures_for_address_with_config(
pubkey,
GetConfirmedSignaturesForAddress2Config {
before,
until,
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
Expand Down
27 changes: 4 additions & 23 deletions tree_backfiller/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::str::FromStr;
use thiserror::Error as ThisError;
use tokio::sync::mpsc::Sender;

use crate::backfiller::CrawlDirection;
use crate::{
queue::{QueuePool, QueuePoolError},
rpc::Rpc,
Expand Down Expand Up @@ -94,38 +95,18 @@ impl TreeResponse {
tree_header,
})
}
pub async fn crawl(
&self,
client: &Rpc,
sender: Sender<tree_transactions::ActiveModel>,
conn: DatabaseConnection,
) -> Result<()> {
pub async fn crawl(&self, client: &Rpc, sender: Sender<Signature>) -> Result<()> {
let mut before = None;

let until = tree_transactions::Entity::find()
.filter(tree_transactions::Column::Tree.eq(self.pubkey.to_string()))
.order_by_desc(tree_transactions::Column::Slot)
.one(&conn)
.await?
.and_then(|t| Signature::from_str(&t.signature).ok());

loop {
let sigs = client
.get_signatures_for_address(&self.pubkey, before, until)
.get_signatures_for_address(&self.pubkey, before)
.await?;

for sig in sigs.iter() {
let slot = i64::try_from(sig.slot)?;
let sig = Signature::from_str(&sig.signature)?;

let tree_transaction = tree_transactions::ActiveModel {
signature: Set(sig.to_string()),
tree: Set(self.pubkey.to_string()),
slot: Set(slot),
..Default::default()
};

sender.send(tree_transaction).await?;
sender.send(sig).await?;

before = Some(sig);
}
Expand Down
Loading