diff --git a/nft_ingester/src/account_updates.rs b/nft_ingester/src/account_updates.rs index 9791b7500..2b3991a36 100644 --- a/nft_ingester/src/account_updates.rs +++ b/nft_ingester/src/account_updates.rs @@ -26,7 +26,7 @@ pub fn account_worker( tokio::spawn(async move { let source = T::new(config).await; if let Ok(mut msg) = source { - let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, false)); + let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender)); loop { let e = msg.recv(stream_key, consumption_type.clone()).await; let mut tasks = JoinSet::new(); @@ -65,7 +65,11 @@ pub fn account_worker( }) } -async fn handle_account(manager: Arc, item: RecvData, stream_key: &'static str) -> Option { +async fn handle_account( + manager: Arc, + item: RecvData, + stream_key: &'static str, +) -> Option { let id = item.id; let mut ret_id = None; let data = item.data; diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 6e11fb539..550515dd7 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -33,7 +33,6 @@ pub struct IngesterConfig { pub transaction_backfill_stream_worker_count: Option, pub code_version: Option<&'static str>, pub background_task_runner_config: Option, - pub cl_audits: Option, // save transaction logs for compressed nfts } impl IngesterConfig { diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 3d08cafb1..f426f3ca4 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -27,7 +27,8 @@ use chrono::Duration; use clap::{arg, command, value_parser}; use log::{error, info}; use plerkle_messenger::{ - redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM + redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_BACKFILL_STREAM, ACCOUNT_STREAM, + TRANSACTION_BACKFILL_STREAM, TRANSACTION_STREAM, }; use std::{path::PathBuf, time}; use tokio::{signal, task::JoinSet}; @@ -118,7 +119,6 @@ pub async fn main() -> Result<(), IngesterError> { TRANSACTION_BACKFILL_STREAM, )?; - if let Some(t) = timer_acc.start::().await { tasks.spawn(t); } @@ -175,7 +175,6 @@ pub async fn main() -> Result<(), IngesterError> { } else { ConsumptionType::New }, - config.cl_audits.unwrap_or(false), TRANSACTION_STREAM, ); } @@ -190,7 +189,6 @@ pub async fn main() -> Result<(), IngesterError> { } else { ConsumptionType::New }, - config.cl_audits.unwrap_or(false), TRANSACTION_BACKFILL_STREAM, ); } diff --git a/nft_ingester/src/metrics.rs b/nft_ingester/src/metrics.rs index 0e44d69c7..ce813b8da 100644 --- a/nft_ingester/src/metrics.rs +++ b/nft_ingester/src/metrics.rs @@ -5,10 +5,7 @@ use cadence_macros::{is_global_default_set, set_global_default, statsd_count, st use log::{error, warn}; use tokio::time::Instant; -use crate::{ - config::IngesterConfig, - error::IngesterError, -}; +use crate::{config::IngesterConfig, error::IngesterError}; #[macro_export] macro_rules! metric { @@ -32,9 +29,7 @@ pub fn setup_metrics(config: &IngesterConfig) { let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap(); let queuing_sink = QueuingMetricSink::from(udp_sink); let builder = StatsdClient::builder("das_ingester", queuing_sink); - let client = builder - .with_tag("env", env) - .build(); + let client = builder.with_tag("env", env).build(); set_global_default(client); } } @@ -42,7 +37,7 @@ pub fn setup_metrics(config: &IngesterConfig) { // Returns a boolean indicating whether the redis message should be ACK'd. // If the message is not ACK'd, it will be retried as long as it is under the retry limit. pub fn capture_result( - id: String, + _id: String, stream: &str, label: (&str, &str), tries: usize, diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs index 70ddcfcea..7894084e7 100644 --- a/nft_ingester/src/program_transformers/bubblegum/burn.rs +++ b/nft_ingester/src/program_transformers/bubblegum/burn.rs @@ -17,13 +17,13 @@ pub async fn burn<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, - cl_audits: bool, + instruction: &str, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( &[ diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs index 1b8f5842a..853b744e5 100644 --- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs @@ -15,13 +15,13 @@ pub async fn cancel_redeem<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, - cl_audits: bool, + instruction: &str, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; #[allow(unreachable_patterns)] return match le.schema { LeafSchema::V1 { diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index 7517f1544..e26d7daef 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -13,7 +13,7 @@ pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, - cl_audits: bool, + instruction: &str, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -37,7 +37,7 @@ where "Handling collection verification event for {} (verify: {}): {}", collection, verify, bundle.txn_id ); - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; let id_bytes = match le.schema { LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), }; diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index 134fe89ca..3252921ff 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -17,7 +17,7 @@ pub async fn process<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, value: bool, - cl_audits: bool, + instruction: &str, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -41,7 +41,7 @@ where "Handling creator verification event for creator {} (verify: {}): {}", creator, verify, bundle.txn_id ); - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; let asset_id_bytes = match le.schema { LeafSchema::V1 { diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 7e930abdc..8b76278a6 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,27 +1,24 @@ use crate::error::IngesterError; use digital_asset_types::dao::{ - asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, + asset, asset_creators, asset_grouping, cl_audits_v2, cl_items, + sea_orm_active_enums::Instruction, }; -use log::{debug, info}; +use log::{debug, error}; use mpl_bubblegum::types::Collection; -use sea_orm::{ - query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, -}; +use sea_orm::{query::*, sea_query::OnConflict, ActiveValue::Set, DbBackend, EntityTrait}; use spl_account_compression::events::ChangeLogEventV1; -use std::convert::From; - pub async fn save_changelog_event<'c, T>( change_log_event: &ChangeLogEventV1, slot: u64, txn_id: &str, txn: &T, - cl_audits: bool, + instruction: &str, ) -> Result where T: ConnectionTrait + TransactionTrait, { - insert_change_log(change_log_event, slot, txn_id, txn, cl_audits).await?; + insert_change_log(change_log_event, slot, txn_id, txn, instruction).await?; Ok(change_log_event.seq) } @@ -31,10 +28,10 @@ fn node_idx_to_leaf_idx(index: i64, tree_height: u32) -> i64 { pub async fn insert_change_log<'c, T>( change_log_event: &ChangeLogEventV1, - slot: u64, + _slot: u64, txn_id: &str, txn: &T, - cl_audits: bool, + instruction: &str, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -45,12 +42,13 @@ where for p in change_log_event.path.iter() { let node_idx = p.index as i64; debug!( - "seq {}, index {} level {}, node {:?}, txn: {:?}", + "seq {}, index {} level {}, node {:?}, txn: {:?}, instruction {}", change_log_event.seq, p.index, i, bs58::encode(p.node).into_string(), txn_id, + instruction ); let leaf_idx = if i == 0 { Some(node_idx_to_leaf_idx(node_idx, depth as u32)) @@ -68,14 +66,6 @@ where ..Default::default() }; - let mut audit_item: Option = if (cl_audits) { - let mut ai: cl_audits::ActiveModel = item.clone().into(); - ai.tx = Set(txn_id.to_string()); - Some(ai) - } else { - None - }; - i += 1; let mut query = cl_items::Entity::insert(item) .on_conflict( @@ -93,46 +83,39 @@ where txn.execute(query) .await .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; - - // Insert the audit item after the insert into cl_items have been completed - if let Some(audit_item) = audit_item { - cl_audits::Entity::insert(audit_item).exec(txn).await?; - } } - // If and only if the entire path of nodes was inserted into the `cl_items` table, then insert - // a single row into the `backfill_items` table. This way if an incomplete path was inserted - // into `cl_items` due to an error, a gap will be created for the tree and the backfiller will - // fix it. - if i - 1 == depth as i64 { - // See if the tree already exists in the `backfill_items` table. - let rows = backfill_items::Entity::find() - .filter(backfill_items::Column::Tree.eq(tree_id)) - .limit(1) - .all(txn) - .await?; - - // If the tree does not exist in `backfill_items` and the sequence number is greater than 1, - // then we know we will need to backfill the tree from sequence number 1 up to the current - // sequence number. So in this case we set at flag to force checking the tree. - let force_chk = rows.is_empty() && change_log_event.seq > 1; - - info!("Adding to backfill_items table at level {}", i - 1); - let item = backfill_items::ActiveModel { - tree: Set(tree_id.to_vec()), - seq: Set(change_log_event.seq as i64), - slot: Set(slot as i64), - force_chk: Set(force_chk), - backfilled: Set(false), - failed: Set(false), - ..Default::default() - }; - - backfill_items::Entity::insert(item).exec(txn).await?; + // Insert the audit item after the insert into cl_items have been completed + let tx_id_bytes = bs58::decode(txn_id) + .into_vec() + .map_err(|_e| IngesterError::ChangeLogEventMalformed)?; + let audit_item_v2 = cl_audits_v2::ActiveModel { + tree: Set(tree_id.to_vec()), + leaf_idx: Set(change_log_event.index as i64), + seq: Set(change_log_event.seq as i64), + tx: Set(tx_id_bytes), + instruction: Set(Instruction::from_str(instruction)), + ..Default::default() + }; + let query = cl_audits_v2::Entity::insert(audit_item_v2) + .on_conflict( + OnConflict::columns([ + cl_audits_v2::Column::Tree, + cl_audits_v2::Column::LeafIdx, + cl_audits_v2::Column::Seq, + ]) + .do_nothing() + .to_owned(), + ) + .build(DbBackend::Postgres); + match txn.execute(query).await { + Ok(_) => {} + Err(e) => { + error!("Error while inserting into cl_audits_v2: {:?}", e); + } } Ok(()) - //TODO -> set maximum size of path and break into multiple statements } pub async fn upsert_asset_with_leaf_info( diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs index 88896de64..ce3d13f54 100644 --- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs +++ b/nft_ingester/src/program_transformers/bubblegum/delegate.rs @@ -15,13 +15,13 @@ pub async fn delegate<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, - cl_audits: bool, + instruction: &str, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; return match le.schema { LeafSchema::V1 { id, diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index 752ed6a3c..7f847ab4c 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -39,7 +39,7 @@ pub async fn mint_v1<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, - cl_audits: bool, + instruction: &str, ) -> Result, IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -49,7 +49,7 @@ where &parsing_result.tree_update, &parsing_result.payload, ) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; let metadata = args; #[allow(unreachable_patterns)] return match le.schema { diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index bcc102c0b..ee36f3973 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -27,7 +27,6 @@ pub async fn handle_bubblegum_instruction<'c, T>( bundle: &'c InstructionBundle<'c>, txn: &T, task_manager: &UnboundedSender, - cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -59,40 +58,40 @@ where match ix_type { InstructionName::Transfer => { - transfer::transfer(parsing_result, bundle, txn, cl_audits).await?; + transfer::transfer(parsing_result, bundle, txn, ix_str).await?; } InstructionName::Burn => { - burn::burn(parsing_result, bundle, txn, cl_audits).await?; + burn::burn(parsing_result, bundle, txn, ix_str).await?; } InstructionName::Delegate => { - delegate::delegate(parsing_result, bundle, txn, cl_audits).await?; + delegate::delegate(parsing_result, bundle, txn, ix_str).await?; } InstructionName::MintV1 | InstructionName::MintToCollectionV1 => { - let task = mint_v1::mint_v1(parsing_result, bundle, txn, cl_audits).await?; + let task = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await?; if let Some(t) = task { task_manager.send(t)?; } } InstructionName::Redeem => { - redeem::redeem(parsing_result, bundle, txn, cl_audits).await?; + redeem::redeem(parsing_result, bundle, txn, ix_str).await?; } InstructionName::CancelRedeem => { - cancel_redeem::cancel_redeem(parsing_result, bundle, txn, cl_audits).await?; + cancel_redeem::cancel_redeem(parsing_result, bundle, txn, ix_str).await?; } InstructionName::DecompressV1 => { decompress::decompress(parsing_result, bundle, txn).await?; } InstructionName::VerifyCreator => { - creator_verification::process(parsing_result, bundle, txn, true, cl_audits).await?; + creator_verification::process(parsing_result, bundle, txn, true, ix_str).await?; } InstructionName::UnverifyCreator => { - creator_verification::process(parsing_result, bundle, txn, false, cl_audits).await?; + creator_verification::process(parsing_result, bundle, txn, false, ix_str).await?; } InstructionName::VerifyCollection | InstructionName::UnverifyCollection | InstructionName::SetAndVerifyCollection => { - collection_verification::process(parsing_result, bundle, txn, cl_audits).await?; + collection_verification::process(parsing_result, bundle, txn, ix_str).await?; } _ => debug!("Bubblegum: Not Implemented Instruction"), } diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs index b9b7f2c27..4202d3018 100644 --- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/redeem.rs @@ -14,13 +14,13 @@ pub async fn redeem<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, - cl_audits: bool, + instruction: &str, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( &[ diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs index 573f33a8f..d8110a258 100644 --- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs +++ b/nft_ingester/src/program_transformers/bubblegum/transfer.rs @@ -16,13 +16,13 @@ pub async fn transfer<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, - cl_audits: bool, + instruction: &str, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; #[allow(unreachable_patterns)] return match le.schema { LeafSchema::V1 { diff --git a/nft_ingester/src/program_transformers/mod.rs b/nft_ingester/src/program_transformers/mod.rs index ff052756e..8eabcb656 100644 --- a/nft_ingester/src/program_transformers/mod.rs +++ b/nft_ingester/src/program_transformers/mod.rs @@ -29,11 +29,10 @@ pub struct ProgramTransformer { task_sender: UnboundedSender, matchers: HashMap>, key_set: HashSet, - cl_audits: bool, } impl ProgramTransformer { - pub fn new(pool: PgPool, task_sender: UnboundedSender, cl_audits: bool) -> Self { + pub fn new(pool: PgPool, task_sender: UnboundedSender) -> Self { let mut matchers: HashMap> = HashMap::with_capacity(1); let bgum = BubblegumParser {}; let token_metadata = TokenMetadataParser {}; @@ -51,7 +50,6 @@ impl ProgramTransformer { task_sender, matchers, key_set: hs, - cl_audits: cl_audits, } } @@ -127,7 +125,6 @@ impl ProgramTransformer { &ix, &self.storage, &self.task_sender, - self.cl_audits, ) .await .map_err(|err| { diff --git a/nft_ingester/src/transaction_notifications.rs b/nft_ingester/src/transaction_notifications.rs index 6ca7cc5f4..99470d32e 100644 --- a/nft_ingester/src/transaction_notifications.rs +++ b/nft_ingester/src/transaction_notifications.rs @@ -6,9 +6,7 @@ use crate::{ use cadence_macros::{is_global_default_set, statsd_count, statsd_time}; use chrono::Utc; use log::{debug, error}; -use plerkle_messenger::{ - ConsumptionType, Messenger, MessengerConfig, RecvData, -}; +use plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData}; use plerkle_serialization::root_as_transaction_info; use sqlx::{Pool, Postgres}; @@ -24,13 +22,12 @@ pub fn transaction_worker( bg_task_sender: UnboundedSender, ack_channel: UnboundedSender<(&'static str, String)>, consumption_type: ConsumptionType, - cl_audits: bool, stream_key: &'static str, ) -> JoinHandle<()> { tokio::spawn(async move { let source = T::new(config).await; if let Ok(mut msg) = source { - let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, cl_audits)); + let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender)); loop { let e = msg.recv(stream_key, consumption_type.clone()).await; let mut tasks = JoinSet::new(); @@ -69,7 +66,11 @@ pub fn transaction_worker( }) } -async fn handle_transaction(manager: Arc, item: RecvData, stream_key: &'static str) -> Option { +async fn handle_transaction( + manager: Arc, + item: RecvData, + stream_key: &'static str, +) -> Option { let mut ret_id = None; if item.tries > 0 { metric! {